feat: using rpc to communicate

This commit is contained in:
Chubby Granny Chaser 2024-06-27 17:18:48 +01:00
parent 05cfdefc84
commit 328b7cb137
No known key found for this signature in database
15 changed files with 332 additions and 298 deletions

View file

@ -1,5 +1,5 @@
import { registerEvent } from "../register-event"; import { registerEvent } from "../register-event";
import { DownloadManager, HydraApi, gamesPlaytime } from "@main/services"; import { HydraApi, TorrentDownloader, gamesPlaytime } from "@main/services";
import { dataSource } from "@main/data-source"; import { dataSource } from "@main/data-source";
import { DownloadQueue, Game, UserAuth } from "@main/entity"; import { DownloadQueue, Game, UserAuth } from "@main/entity";
@ -19,8 +19,8 @@ const signOut = async (_event: Electron.IpcMainInvokeEvent) => {
gamesPlaytime.clear(); gamesPlaytime.clear();
}); });
/* Disconnects aria2 */ /* Disconnects libtorrent */
DownloadManager.kill(); TorrentDownloader.kill();
await Promise.all([ await Promise.all([
databaseOperations, databaseOperations,

View file

@ -4,7 +4,7 @@ import i18n from "i18next";
import path from "node:path"; import path from "node:path";
import url from "node:url"; import url from "node:url";
import { electronApp, optimizer } from "@electron-toolkit/utils"; import { electronApp, optimizer } from "@electron-toolkit/utils";
import { DownloadManager, logger, WindowManager } from "@main/services"; import { logger, TorrentDownloader, WindowManager } from "@main/services";
import { dataSource } from "@main/data-source"; import { dataSource } from "@main/data-source";
import * as resources from "@locales"; import * as resources from "@locales";
import { userPreferencesRepository } from "@main/repository"; import { userPreferencesRepository } from "@main/repository";
@ -108,7 +108,8 @@ app.on("window-all-closed", () => {
}); });
app.on("before-quit", () => { app.on("before-quit", () => {
DownloadManager.kill(); /* Disconnects libtorrent */
TorrentDownloader.kill();
}); });
app.on("activate", () => { app.on("activate", () => {

View file

@ -1,187 +0,0 @@
import cp from "node:child_process";
import { WindowManager } from "./window-manager";
import { Game } from "@main/entity";
import { startTorrentClient } from "./torrent-client";
import { readPipe, writePipe } from "./fifo";
import { downloadQueueRepository, gameRepository } from "@main/repository";
import { publishDownloadCompleteNotification } from "./notifications";
import { DownloadProgress } from "@types";
import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity";
enum LibtorrentStatus {
CheckingFiles = 1,
DownloadingMetadata = 2,
Downloading = 3,
Finished = 4,
Seeding = 5,
}
const getETA = (
totalLength: number,
completedLength: number,
speed: number
) => {
const remainingBytes = totalLength - completedLength;
if (remainingBytes >= 0 && speed > 0) {
return (remainingBytes / speed) * 1000;
}
return -1;
};
export class DownloadManager {
private static torrentClient: cp.ChildProcess | null = null;
private static downloadingGameId = -1;
private static async spawn() {
this.torrentClient = await startTorrentClient();
}
public static kill() {
if (this.torrentClient) {
this.torrentClient.kill();
this.torrentClient = null;
}
}
public static async watchDownloads() {
if (!this.downloadingGameId) return;
const buf = readPipe.socket?.read(1024 * 2);
if (buf === null) return;
const message = Buffer.from(buf.slice(0, buf.indexOf(0x00))).toString(
"utf-8"
);
try {
const {
progress,
numPeers,
numSeeds,
downloadSpeed,
bytesDownloaded,
fileSize,
folderName,
status,
} = JSON.parse(message) as {
progress: number;
numPeers: number;
numSeeds: number;
downloadSpeed: number;
bytesDownloaded: number;
fileSize: number;
folderName: string;
status: number;
};
// TODO: Checking files as metadata is a workaround
const isDownloadingMetadata =
status === LibtorrentStatus.DownloadingMetadata ||
status === LibtorrentStatus.CheckingFiles;
if (!isDownloadingMetadata) {
const update: QueryDeepPartialEntity<Game> = {
bytesDownloaded,
fileSize,
progress,
};
await gameRepository.update(
{ id: this.downloadingGameId },
{
...update,
folderName,
}
);
}
const game = await gameRepository.findOne({
where: { id: this.downloadingGameId, isDeleted: false },
});
if (WindowManager.mainWindow && game) {
if (!isNaN(progress))
WindowManager.mainWindow.setProgressBar(
progress === 1 ? -1 : progress
);
const payload = {
numPeers,
numSeeds,
downloadSpeed,
timeRemaining: getETA(fileSize, bytesDownloaded, downloadSpeed),
isDownloadingMetadata,
game,
} as DownloadProgress;
WindowManager.mainWindow.webContents.send(
"on-download-progress",
JSON.parse(JSON.stringify(payload))
);
}
if (progress === 1 && game) {
publishDownloadCompleteNotification(game);
await downloadQueueRepository.delete({ game });
// Clear download
this.downloadingGameId = -1;
const [nextQueueItem] = await downloadQueueRepository.find({
order: {
id: "DESC",
},
relations: {
game: true,
},
});
if (nextQueueItem) {
this.resumeDownload(nextQueueItem.game);
}
}
} catch (err) {
return;
}
}
static async pauseDownload() {
writePipe.write({
action: "pause",
game_id: this.downloadingGameId,
});
this.downloadingGameId = -1;
WindowManager.mainWindow?.setProgressBar(-1);
}
static async resumeDownload(game: Game) {
this.startDownload(game);
}
static async startDownload(game: Game) {
if (!this.torrentClient) await this.spawn();
writePipe.write({
action: "start",
game_id: game.id,
magnet: game.uri,
save_path: game.downloadPath,
});
this.downloadingGameId = game.id;
}
static async cancelDownload(gameId: number) {
writePipe.write({ action: "cancel", game_id: gameId });
WindowManager.mainWindow?.setProgressBar(-1);
}
}

View file

@ -0,0 +1,101 @@
import { Game } from "@main/entity";
import { Downloader } from "@shared";
import { TorrentDownloader } from "./torrent-downloader";
import { WindowManager } from "../window-manager";
import { downloadQueueRepository, gameRepository } from "@main/repository";
import { publishDownloadCompleteNotification } from "../notifications";
export class DownloadManager {
private static currentDownloader: Downloader | null = null;
public static async watchDownloads() {
if (this.currentDownloader === Downloader.RealDebrid) {
throw new Error();
} else {
const status = await TorrentDownloader.getStatus();
if (status) {
const { gameId, progress } = status;
const game = await gameRepository.findOne({
where: { id: gameId, isDeleted: false },
});
if (WindowManager.mainWindow && game) {
WindowManager.mainWindow.setProgressBar(
progress === 1 ? -1 : progress
);
WindowManager.mainWindow.webContents.send(
"on-download-progress",
JSON.parse(
JSON.stringify({
...status,
game,
})
)
);
}
if (status.progress === 1 && game) {
publishDownloadCompleteNotification(game);
await downloadQueueRepository.delete({ game });
const [nextQueueItem] = await downloadQueueRepository.find({
order: {
id: "DESC",
},
relations: {
game: true,
},
});
if (nextQueueItem) {
this.resumeDownload(nextQueueItem.game);
}
}
}
}
}
static async pauseDownload() {
if (this.currentDownloader === Downloader.RealDebrid) {
throw new Error();
} else {
await TorrentDownloader.pauseDownload();
}
WindowManager.mainWindow?.setProgressBar(-1);
this.currentDownloader = null;
}
static async resumeDownload(game: Game) {
if (game.downloader === Downloader.RealDebrid) {
throw new Error();
} else {
TorrentDownloader.resumeDownload(game);
this.currentDownloader = Downloader.Torrent;
}
}
static async cancelDownload(gameId: number) {
if (this.currentDownloader === Downloader.RealDebrid) {
throw new Error();
} else {
TorrentDownloader.cancelDownload(gameId);
}
WindowManager.mainWindow?.setProgressBar(-1);
this.currentDownloader = null;
}
static async startDownload(game: Game) {
if (game.downloader === Downloader.RealDebrid) {
throw new Error();
} else {
TorrentDownloader.startDownload(game);
this.currentDownloader = Downloader.Torrent;
}
}
}

View file

@ -0,0 +1,13 @@
export const calculateETA = (
totalLength: number,
completedLength: number,
speed: number
) => {
const remainingBytes = totalLength - completedLength;
if (remainingBytes >= 0 && speed > 0) {
return (remainingBytes / speed) * 1000;
}
return -1;
};

View file

@ -0,0 +1,2 @@
export * from "./download-manager";
export * from "./torrent-downloader";

View file

@ -2,7 +2,6 @@ import path from "node:path";
import cp from "node:child_process"; import cp from "node:child_process";
import fs from "node:fs"; import fs from "node:fs";
import { app, dialog } from "electron"; import { app, dialog } from "electron";
import { readPipe, writePipe } from "./fifo";
const binaryNameByPlatform: Partial<Record<NodeJS.Platform, string>> = { const binaryNameByPlatform: Partial<Record<NodeJS.Platform, string>> = {
darwin: "hydra-download-manager", darwin: "hydra-download-manager",
@ -11,10 +10,12 @@ const binaryNameByPlatform: Partial<Record<NodeJS.Platform, string>> = {
}; };
export const BITTORRENT_PORT = "5881"; export const BITTORRENT_PORT = "5881";
export const RPC_PORT = "8084";
const commonArgs = [BITTORRENT_PORT, writePipe.socketPath, readPipe.socketPath]; const commonArgs = [BITTORRENT_PORT, RPC_PORT];
export const startTorrentClient = async (): Promise<cp.ChildProcess> => { export const startTorrentClient = () => {
console.log("CALLED");
if (app.isPackaged) { if (app.isPackaged) {
const binaryName = binaryNameByPlatform[process.platform]!; const binaryName = binaryNameByPlatform[process.platform]!;
const binaryPath = path.join( const binaryPath = path.join(
@ -32,14 +33,10 @@ export const startTorrentClient = async (): Promise<cp.ChildProcess> => {
app.quit(); app.quit();
} }
const torrentClient = cp.spawn(binaryPath, commonArgs, { return cp.spawn(binaryPath, commonArgs, {
stdio: "inherit", stdio: "inherit",
windowsHide: true, windowsHide: true,
}); });
await Promise.all([writePipe.createPipe(), readPipe.createPipe()]);
return torrentClient;
} else { } else {
const scriptPath = path.join( const scriptPath = path.join(
__dirname, __dirname,
@ -49,12 +46,8 @@ export const startTorrentClient = async (): Promise<cp.ChildProcess> => {
"main.py" "main.py"
); );
const torrentClient = cp.spawn("python3", [scriptPath, ...commonArgs], { return cp.spawn("python3", [scriptPath, ...commonArgs], {
stdio: "inherit", stdio: "inherit",
}); });
await Promise.all([writePipe.createPipe(), readPipe.createPipe()]);
return torrentClient;
} }
}; };

View file

@ -0,0 +1,153 @@
import cp from "node:child_process";
import { Game } from "@main/entity";
import { RPC_PORT, startTorrentClient } from "./torrent-client";
import { gameRepository } from "@main/repository";
import { DownloadProgress } from "@types";
import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity";
import { calculateETA } from "./helpers";
import axios from "axios";
enum LibtorrentStatus {
CheckingFiles = 1,
DownloadingMetadata = 2,
Downloading = 3,
Finished = 4,
Seeding = 5,
}
interface LibtorrentPayload {
progress: number;
numPeers: number;
numSeeds: number;
downloadSpeed: number;
bytesDownloaded: number;
fileSize: number;
folderName: string;
status: LibtorrentStatus;
gameId: number;
}
export class TorrentDownloader {
private static torrentClient: cp.ChildProcess | null = null;
private static downloadingGameId = -1;
private static rpc = axios.create({
baseURL: `http://localhost:${RPC_PORT}`,
});
private static spawn() {
this.torrentClient = startTorrentClient();
}
public static kill() {
if (this.torrentClient) {
this.torrentClient.kill();
this.torrentClient = null;
this.downloadingGameId = -1;
}
}
public static async getStatus() {
if (!this.torrentClient) this.spawn();
if (this.downloadingGameId === -1) return null;
const response = await this.rpc.get<LibtorrentPayload | null>("/status");
if (response.data === null) return null;
try {
const {
progress,
numPeers,
numSeeds,
downloadSpeed,
bytesDownloaded,
fileSize,
folderName,
status,
gameId,
} = response.data;
this.downloadingGameId = gameId;
const isDownloadingMetadata =
status === LibtorrentStatus.DownloadingMetadata;
const isCheckingFiles = status === LibtorrentStatus.CheckingFiles;
if (!isDownloadingMetadata) {
const update: QueryDeepPartialEntity<Game> = {
bytesDownloaded,
fileSize,
progress,
};
await gameRepository.update(
{ id: gameId },
{
...update,
folderName,
}
);
}
if (progress === 1 && !isCheckingFiles) {
this.downloadingGameId = -1;
}
return {
numPeers,
numSeeds,
downloadSpeed,
timeRemaining: calculateETA(fileSize, bytesDownloaded, downloadSpeed),
isDownloadingMetadata,
isCheckingFiles,
progress,
gameId,
} as DownloadProgress;
} catch (err) {
return null;
}
}
static async pauseDownload() {
if (!this.torrentClient) this.spawn();
await this.rpc
.post("/action", {
action: "pause",
game_id: this.downloadingGameId,
})
.catch(() => {});
this.downloadingGameId = -1;
}
static resumeDownload(game: Game) {
this.startDownload(game);
}
static async startDownload(game: Game) {
if (!this.torrentClient) this.spawn();
await this.rpc.post("/action", {
action: "start",
game_id: game.id,
magnet: game.uri,
save_path: game.downloadPath,
});
this.downloadingGameId = game.id;
}
static async cancelDownload(gameId: number) {
if (!this.torrentClient) this.spawn();
await this.rpc.post("/action", {
action: "cancel",
game_id: gameId,
});
this.downloadingGameId = -1;
}
}

View file

@ -1,38 +0,0 @@
import path from "node:path";
import net from "node:net";
import crypto from "node:crypto";
import os from "node:os";
export class FIFO {
public socket: null | net.Socket = null;
public socketPath = this.generateSocketFilename();
private generateSocketFilename() {
const hash = crypto.randomBytes(16).toString("hex");
if (process.platform === "win32") {
return "\\\\.\\pipe\\" + hash;
}
return path.join(os.tmpdir(), hash);
}
public write(data: any) {
if (!this.socket) return;
this.socket.write(Buffer.from(JSON.stringify(data)));
}
public createPipe() {
return new Promise((resolve) => {
const server = net.createServer((socket) => {
this.socket = socket;
resolve(null);
});
server.listen(this.socketPath);
});
}
}
export const writePipe = new FIFO();
export const readPipe = new FIFO();

View file

@ -3,7 +3,7 @@ export * from "./steam";
export * from "./steam-250"; export * from "./steam-250";
export * from "./steam-grid"; export * from "./steam-grid";
export * from "./window-manager"; export * from "./window-manager";
export * from "./download-manager"; export * from "./download";
export * from "./how-long-to-beat"; export * from "./how-long-to-beat";
export * from "./process-watcher"; export * from "./process-watcher";
export * from "./main-loop"; export * from "./main-loop";

View file

@ -1,5 +1,5 @@
import { sleep } from "@main/helpers"; import { sleep } from "@main/helpers";
import { DownloadManager } from "./download-manager"; import { DownloadManager } from "./download";
import { watchProcesses } from "./process-watcher"; import { watchProcesses } from "./process-watcher";
export const startMainLoop = async () => { export const startMainLoop = async () => {

View file

@ -32,6 +32,12 @@ export function BottomPanel() {
const status = useMemo(() => { const status = useMemo(() => {
if (isGameDownloading) { if (isGameDownloading) {
if (lastPacket?.isCheckingFiles)
return t("checking_files", {
title: lastPacket?.game.title,
percentage: progress,
});
if (lastPacket?.isDownloadingMetadata) if (lastPacket?.isDownloadingMetadata)
return t("downloading_metadata", { title: lastPacket?.game.title }); return t("downloading_metadata", { title: lastPacket?.game.title });
@ -56,6 +62,7 @@ export function BottomPanel() {
isGameDownloading, isGameDownloading,
lastPacket?.game, lastPacket?.game,
lastPacket?.isDownloadingMetadata, lastPacket?.isDownloadingMetadata,
lastPacket?.isCheckingFiles,
progress, progress,
eta, eta,
downloadSpeed, downloadSpeed,

View file

@ -149,6 +149,9 @@ export interface DownloadProgress {
numPeers: number; numPeers: number;
numSeeds: number; numSeeds: number;
isDownloadingMetadata: boolean; isDownloadingMetadata: boolean;
isCheckingFiles: boolean;
progress: number;
gameId: number;
game: LibraryGame; game: LibraryGame;
} }

View file

@ -1,35 +0,0 @@
import platform
class Fifo:
socket_handle = None
def __init__(self, path: str):
if platform.system() == "Windows":
import win32file
self.socket_handle = win32file.CreateFile(path, win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None)
else:
import socket
self.socket_handle = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket_handle.connect(path)
def recv(self, bufSize: int):
if platform.system() == "Windows":
import win32file
result, data = win32file.ReadFile(self.socket_handle, bufSize)
return data
else:
return self.socket_handle.recv(bufSize)
def send_message(self, msg: str):
buffer = bytearray(1024 * 2)
buffer[:len(msg)] = bytes(msg, "utf-8")
if platform.system() == "Windows":
import win32file
win32file.WriteFile(self.socket_handle, buffer)
else:
self.socket_handle.send(buffer)

View file

@ -1,17 +1,16 @@
import libtorrent as lt import libtorrent as lt
import sys import sys
from fifo import Fifo from http.server import HTTPServer, BaseHTTPRequestHandler
import json import json
import threading import threading
import time import time
torrent_port = sys.argv[1] torrent_port = sys.argv[1]
read_sock_path = sys.argv[2] http_port = sys.argv[2]
write_sock_path = sys.argv[3]
print(http_port)
session = lt.session({'listen_interfaces': '0.0.0.0:{port}'.format(port=torrent_port)}) session = lt.session({'listen_interfaces': '0.0.0.0:{port}'.format(port=torrent_port)})
read_fifo = Fifo(read_sock_path)
write_fifo = Fifo(write_sock_path)
torrent_handles = {} torrent_handles = {}
downloading_game_id = -1 downloading_game_id = -1
@ -37,6 +36,7 @@ def pause_download(game_id: int):
torrent_handle.pause() torrent_handle.pause()
torrent_handle.unset_flags(lt.torrent_flags.auto_managed) torrent_handle.unset_flags(lt.torrent_flags.auto_managed)
downloading_game_id = -1 downloading_game_id = -1
Handler.current_status = None
def cancel_download(game_id: int): def cancel_download(game_id: int):
global torrent_handles global torrent_handles
@ -48,6 +48,7 @@ def cancel_download(game_id: int):
session.remove_torrent(torrent_handle) session.remove_torrent(torrent_handle)
torrent_handles[game_id] = None torrent_handles[game_id] = None
downloading_game_id =-1 downloading_game_id =-1
Handler.current_status = None
def get_download_updates(): def get_download_updates():
global torrent_handles global torrent_handles
@ -63,7 +64,7 @@ def get_download_updates():
status = torrent_handle.status() status = torrent_handle.status()
info = torrent_handle.get_torrent_info() info = torrent_handle.get_torrent_info()
write_fifo.send_message(json.dumps({ Handler.current_status = {
'folderName': info.name() if info else "", 'folderName': info.name() if info else "",
'fileSize': info.total_size() if info else 0, 'fileSize': info.total_size() if info else 0,
'gameId': downloading_game_id, 'gameId': downloading_game_id,
@ -73,29 +74,49 @@ def get_download_updates():
'numSeeds': status.num_seeds, 'numSeeds': status.num_seeds,
'status': status.state, 'status': status.state,
'bytesDownloaded': status.progress * info.total_size() if info else status.all_time_download, 'bytesDownloaded': status.progress * info.total_size() if info else status.all_time_download,
})) }
if status.progress == 1: if status.progress == 1:
cancel_download(downloading_game_id) cancel_download(downloading_game_id)
downloading_game_id = -1 downloading_game_id = -1
Handler.current_status = None
time.sleep(0.5) time.sleep(0.5)
def listen_to_socket():
while True:
msg = read_fifo.recv(1024 * 2)
payload = json.loads(msg.decode("utf-8"))
if payload['action'] == "start": class Handler(BaseHTTPRequestHandler):
start_download(payload['game_id'], payload['magnet'], payload['save_path']) current_status = None
elif payload['action'] == "pause":
pause_download(payload['game_id']) def do_GET(self):
elif payload['action'] == "cancel": if self.path == "/status":
cancel_download(payload['game_id']) self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(self.current_status).encode('utf-8'))
def do_POST(self):
if self.path == "/action":
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
if data['action'] == 'start':
start_download(data['game_id'], data['magnet'], data['save_path'])
elif data['action'] == 'pause':
pause_download(data['game_id'])
elif data['action'] == 'cancel':
cancel_download(data['game_id'])
self.send_response(200)
self.end_headers()
if __name__ == "__main__": if __name__ == "__main__":
p1 = threading.Thread(target=get_download_updates) p1 = threading.Thread(target=get_download_updates)
p2 = threading.Thread(target=listen_to_socket)
httpd = HTTPServer(("", int(http_port)), Handler)
p2 = threading.Thread(target=httpd.serve_forever)
p1.start() p1.start()
p2.start() p2.start()