From 328b7cb1372fa1140b90ae1a51f92d717b727580 Mon Sep 17 00:00:00 2001 From: Chubby Granny Chaser Date: Thu, 27 Jun 2024 17:18:48 +0100 Subject: [PATCH] feat: using rpc to communicate --- src/main/events/auth/sign-out.ts | 6 +- src/main/index.ts | 5 +- src/main/services/download-manager.ts | 187 ------------------ .../services/download/download-manager.ts | 101 ++++++++++ src/main/services/download/helpers.ts | 13 ++ src/main/services/download/index.ts | 2 + .../services/{ => download}/torrent-client.ts | 19 +- .../services/download/torrent-downloader.ts | 153 ++++++++++++++ src/main/services/fifo.ts | 38 ---- src/main/services/index.ts | 2 +- src/main/services/main-loop.ts | 2 +- .../components/bottom-panel/bottom-panel.tsx | 7 + src/types/index.ts | 3 + torrent-client/fifo.py | 35 ---- torrent-client/main.py | 57 ++++-- 15 files changed, 332 insertions(+), 298 deletions(-) delete mode 100644 src/main/services/download-manager.ts create mode 100644 src/main/services/download/download-manager.ts create mode 100644 src/main/services/download/helpers.ts create mode 100644 src/main/services/download/index.ts rename src/main/services/{ => download}/torrent-client.ts (65%) create mode 100644 src/main/services/download/torrent-downloader.ts delete mode 100644 src/main/services/fifo.ts delete mode 100644 torrent-client/fifo.py diff --git a/src/main/events/auth/sign-out.ts b/src/main/events/auth/sign-out.ts index 838d5a8d..1316d49d 100644 --- a/src/main/events/auth/sign-out.ts +++ b/src/main/events/auth/sign-out.ts @@ -1,5 +1,5 @@ import { registerEvent } from "../register-event"; -import { DownloadManager, HydraApi, gamesPlaytime } from "@main/services"; +import { HydraApi, TorrentDownloader, gamesPlaytime } from "@main/services"; import { dataSource } from "@main/data-source"; import { DownloadQueue, Game, UserAuth } from "@main/entity"; @@ -19,8 +19,8 @@ const signOut = async (_event: Electron.IpcMainInvokeEvent) => { gamesPlaytime.clear(); }); - /* Disconnects aria2 */ - DownloadManager.kill(); + /* Disconnects libtorrent */ + TorrentDownloader.kill(); await Promise.all([ databaseOperations, diff --git a/src/main/index.ts b/src/main/index.ts index 81ad2172..06d354da 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -4,7 +4,7 @@ import i18n from "i18next"; import path from "node:path"; import url from "node:url"; import { electronApp, optimizer } from "@electron-toolkit/utils"; -import { DownloadManager, logger, WindowManager } from "@main/services"; +import { logger, TorrentDownloader, WindowManager } from "@main/services"; import { dataSource } from "@main/data-source"; import * as resources from "@locales"; import { userPreferencesRepository } from "@main/repository"; @@ -108,7 +108,8 @@ app.on("window-all-closed", () => { }); app.on("before-quit", () => { - DownloadManager.kill(); + /* Disconnects libtorrent */ + TorrentDownloader.kill(); }); app.on("activate", () => { diff --git a/src/main/services/download-manager.ts b/src/main/services/download-manager.ts deleted file mode 100644 index 64e94bc8..00000000 --- a/src/main/services/download-manager.ts +++ /dev/null @@ -1,187 +0,0 @@ -import cp from "node:child_process"; - -import { WindowManager } from "./window-manager"; - -import { Game } from "@main/entity"; -import { startTorrentClient } from "./torrent-client"; -import { readPipe, writePipe } from "./fifo"; -import { downloadQueueRepository, gameRepository } from "@main/repository"; -import { publishDownloadCompleteNotification } from "./notifications"; -import { DownloadProgress } from "@types"; -import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity"; - -enum LibtorrentStatus { - CheckingFiles = 1, - DownloadingMetadata = 2, - Downloading = 3, - Finished = 4, - Seeding = 5, -} - -const getETA = ( - totalLength: number, - completedLength: number, - speed: number -) => { - const remainingBytes = totalLength - completedLength; - - if (remainingBytes >= 0 && speed > 0) { - return (remainingBytes / speed) * 1000; - } - - return -1; -}; - -export class DownloadManager { - private static torrentClient: cp.ChildProcess | null = null; - private static downloadingGameId = -1; - - private static async spawn() { - this.torrentClient = await startTorrentClient(); - } - - public static kill() { - if (this.torrentClient) { - this.torrentClient.kill(); - this.torrentClient = null; - } - } - - public static async watchDownloads() { - if (!this.downloadingGameId) return; - - const buf = readPipe.socket?.read(1024 * 2); - - if (buf === null) return; - - const message = Buffer.from(buf.slice(0, buf.indexOf(0x00))).toString( - "utf-8" - ); - - try { - const { - progress, - numPeers, - numSeeds, - downloadSpeed, - bytesDownloaded, - fileSize, - folderName, - status, - } = JSON.parse(message) as { - progress: number; - numPeers: number; - numSeeds: number; - downloadSpeed: number; - bytesDownloaded: number; - fileSize: number; - folderName: string; - status: number; - }; - - // TODO: Checking files as metadata is a workaround - const isDownloadingMetadata = - status === LibtorrentStatus.DownloadingMetadata || - status === LibtorrentStatus.CheckingFiles; - - if (!isDownloadingMetadata) { - const update: QueryDeepPartialEntity = { - bytesDownloaded, - fileSize, - progress, - }; - - await gameRepository.update( - { id: this.downloadingGameId }, - { - ...update, - folderName, - } - ); - } - - const game = await gameRepository.findOne({ - where: { id: this.downloadingGameId, isDeleted: false }, - }); - - if (WindowManager.mainWindow && game) { - if (!isNaN(progress)) - WindowManager.mainWindow.setProgressBar( - progress === 1 ? -1 : progress - ); - - const payload = { - numPeers, - numSeeds, - downloadSpeed, - timeRemaining: getETA(fileSize, bytesDownloaded, downloadSpeed), - isDownloadingMetadata, - game, - } as DownloadProgress; - - WindowManager.mainWindow.webContents.send( - "on-download-progress", - JSON.parse(JSON.stringify(payload)) - ); - } - - if (progress === 1 && game) { - publishDownloadCompleteNotification(game); - - await downloadQueueRepository.delete({ game }); - - // Clear download - this.downloadingGameId = -1; - - const [nextQueueItem] = await downloadQueueRepository.find({ - order: { - id: "DESC", - }, - relations: { - game: true, - }, - }); - - if (nextQueueItem) { - this.resumeDownload(nextQueueItem.game); - } - } - } catch (err) { - return; - } - } - - static async pauseDownload() { - writePipe.write({ - action: "pause", - game_id: this.downloadingGameId, - }); - - this.downloadingGameId = -1; - - WindowManager.mainWindow?.setProgressBar(-1); - } - - static async resumeDownload(game: Game) { - this.startDownload(game); - } - - static async startDownload(game: Game) { - if (!this.torrentClient) await this.spawn(); - - writePipe.write({ - action: "start", - game_id: game.id, - magnet: game.uri, - save_path: game.downloadPath, - }); - - this.downloadingGameId = game.id; - } - - static async cancelDownload(gameId: number) { - writePipe.write({ action: "cancel", game_id: gameId }); - - WindowManager.mainWindow?.setProgressBar(-1); - } -} diff --git a/src/main/services/download/download-manager.ts b/src/main/services/download/download-manager.ts new file mode 100644 index 00000000..2230919b --- /dev/null +++ b/src/main/services/download/download-manager.ts @@ -0,0 +1,101 @@ +import { Game } from "@main/entity"; +import { Downloader } from "@shared"; +import { TorrentDownloader } from "./torrent-downloader"; +import { WindowManager } from "../window-manager"; +import { downloadQueueRepository, gameRepository } from "@main/repository"; +import { publishDownloadCompleteNotification } from "../notifications"; + +export class DownloadManager { + private static currentDownloader: Downloader | null = null; + + public static async watchDownloads() { + if (this.currentDownloader === Downloader.RealDebrid) { + throw new Error(); + } else { + const status = await TorrentDownloader.getStatus(); + + if (status) { + const { gameId, progress } = status; + + const game = await gameRepository.findOne({ + where: { id: gameId, isDeleted: false }, + }); + + if (WindowManager.mainWindow && game) { + WindowManager.mainWindow.setProgressBar( + progress === 1 ? -1 : progress + ); + + WindowManager.mainWindow.webContents.send( + "on-download-progress", + JSON.parse( + JSON.stringify({ + ...status, + game, + }) + ) + ); + } + + if (status.progress === 1 && game) { + publishDownloadCompleteNotification(game); + + await downloadQueueRepository.delete({ game }); + + const [nextQueueItem] = await downloadQueueRepository.find({ + order: { + id: "DESC", + }, + relations: { + game: true, + }, + }); + + if (nextQueueItem) { + this.resumeDownload(nextQueueItem.game); + } + } + } + } + } + + static async pauseDownload() { + if (this.currentDownloader === Downloader.RealDebrid) { + throw new Error(); + } else { + await TorrentDownloader.pauseDownload(); + } + + WindowManager.mainWindow?.setProgressBar(-1); + this.currentDownloader = null; + } + + static async resumeDownload(game: Game) { + if (game.downloader === Downloader.RealDebrid) { + throw new Error(); + } else { + TorrentDownloader.resumeDownload(game); + this.currentDownloader = Downloader.Torrent; + } + } + + static async cancelDownload(gameId: number) { + if (this.currentDownloader === Downloader.RealDebrid) { + throw new Error(); + } else { + TorrentDownloader.cancelDownload(gameId); + } + + WindowManager.mainWindow?.setProgressBar(-1); + this.currentDownloader = null; + } + + static async startDownload(game: Game) { + if (game.downloader === Downloader.RealDebrid) { + throw new Error(); + } else { + TorrentDownloader.startDownload(game); + this.currentDownloader = Downloader.Torrent; + } + } +} diff --git a/src/main/services/download/helpers.ts b/src/main/services/download/helpers.ts new file mode 100644 index 00000000..07c962bc --- /dev/null +++ b/src/main/services/download/helpers.ts @@ -0,0 +1,13 @@ +export const calculateETA = ( + totalLength: number, + completedLength: number, + speed: number +) => { + const remainingBytes = totalLength - completedLength; + + if (remainingBytes >= 0 && speed > 0) { + return (remainingBytes / speed) * 1000; + } + + return -1; +}; diff --git a/src/main/services/download/index.ts b/src/main/services/download/index.ts new file mode 100644 index 00000000..6244b81a --- /dev/null +++ b/src/main/services/download/index.ts @@ -0,0 +1,2 @@ +export * from "./download-manager"; +export * from "./torrent-downloader"; diff --git a/src/main/services/torrent-client.ts b/src/main/services/download/torrent-client.ts similarity index 65% rename from src/main/services/torrent-client.ts rename to src/main/services/download/torrent-client.ts index b506ae1e..3adc1841 100644 --- a/src/main/services/torrent-client.ts +++ b/src/main/services/download/torrent-client.ts @@ -2,7 +2,6 @@ import path from "node:path"; import cp from "node:child_process"; import fs from "node:fs"; import { app, dialog } from "electron"; -import { readPipe, writePipe } from "./fifo"; const binaryNameByPlatform: Partial> = { darwin: "hydra-download-manager", @@ -11,10 +10,12 @@ const binaryNameByPlatform: Partial> = { }; export const BITTORRENT_PORT = "5881"; +export const RPC_PORT = "8084"; -const commonArgs = [BITTORRENT_PORT, writePipe.socketPath, readPipe.socketPath]; +const commonArgs = [BITTORRENT_PORT, RPC_PORT]; -export const startTorrentClient = async (): Promise => { +export const startTorrentClient = () => { + console.log("CALLED"); if (app.isPackaged) { const binaryName = binaryNameByPlatform[process.platform]!; const binaryPath = path.join( @@ -32,14 +33,10 @@ export const startTorrentClient = async (): Promise => { app.quit(); } - const torrentClient = cp.spawn(binaryPath, commonArgs, { + return cp.spawn(binaryPath, commonArgs, { stdio: "inherit", windowsHide: true, }); - - await Promise.all([writePipe.createPipe(), readPipe.createPipe()]); - - return torrentClient; } else { const scriptPath = path.join( __dirname, @@ -49,12 +46,8 @@ export const startTorrentClient = async (): Promise => { "main.py" ); - const torrentClient = cp.spawn("python3", [scriptPath, ...commonArgs], { + return cp.spawn("python3", [scriptPath, ...commonArgs], { stdio: "inherit", }); - - await Promise.all([writePipe.createPipe(), readPipe.createPipe()]); - - return torrentClient; } }; diff --git a/src/main/services/download/torrent-downloader.ts b/src/main/services/download/torrent-downloader.ts new file mode 100644 index 00000000..e9e9cff6 --- /dev/null +++ b/src/main/services/download/torrent-downloader.ts @@ -0,0 +1,153 @@ +import cp from "node:child_process"; + +import { Game } from "@main/entity"; +import { RPC_PORT, startTorrentClient } from "./torrent-client"; +import { gameRepository } from "@main/repository"; +import { DownloadProgress } from "@types"; +import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity"; +import { calculateETA } from "./helpers"; +import axios from "axios"; + +enum LibtorrentStatus { + CheckingFiles = 1, + DownloadingMetadata = 2, + Downloading = 3, + Finished = 4, + Seeding = 5, +} + +interface LibtorrentPayload { + progress: number; + numPeers: number; + numSeeds: number; + downloadSpeed: number; + bytesDownloaded: number; + fileSize: number; + folderName: string; + status: LibtorrentStatus; + gameId: number; +} + +export class TorrentDownloader { + private static torrentClient: cp.ChildProcess | null = null; + private static downloadingGameId = -1; + private static rpc = axios.create({ + baseURL: `http://localhost:${RPC_PORT}`, + }); + + private static spawn() { + this.torrentClient = startTorrentClient(); + } + + public static kill() { + if (this.torrentClient) { + this.torrentClient.kill(); + this.torrentClient = null; + this.downloadingGameId = -1; + } + } + + public static async getStatus() { + if (!this.torrentClient) this.spawn(); + if (this.downloadingGameId === -1) return null; + + const response = await this.rpc.get("/status"); + + if (response.data === null) return null; + + try { + const { + progress, + numPeers, + numSeeds, + downloadSpeed, + bytesDownloaded, + fileSize, + folderName, + status, + gameId, + } = response.data; + + this.downloadingGameId = gameId; + + const isDownloadingMetadata = + status === LibtorrentStatus.DownloadingMetadata; + + const isCheckingFiles = status === LibtorrentStatus.CheckingFiles; + + if (!isDownloadingMetadata) { + const update: QueryDeepPartialEntity = { + bytesDownloaded, + fileSize, + progress, + }; + + await gameRepository.update( + { id: gameId }, + { + ...update, + folderName, + } + ); + } + + if (progress === 1 && !isCheckingFiles) { + this.downloadingGameId = -1; + } + + return { + numPeers, + numSeeds, + downloadSpeed, + timeRemaining: calculateETA(fileSize, bytesDownloaded, downloadSpeed), + isDownloadingMetadata, + isCheckingFiles, + progress, + gameId, + } as DownloadProgress; + } catch (err) { + return null; + } + } + + static async pauseDownload() { + if (!this.torrentClient) this.spawn(); + + await this.rpc + .post("/action", { + action: "pause", + game_id: this.downloadingGameId, + }) + .catch(() => {}); + + this.downloadingGameId = -1; + } + + static resumeDownload(game: Game) { + this.startDownload(game); + } + + static async startDownload(game: Game) { + if (!this.torrentClient) this.spawn(); + + await this.rpc.post("/action", { + action: "start", + game_id: game.id, + magnet: game.uri, + save_path: game.downloadPath, + }); + + this.downloadingGameId = game.id; + } + + static async cancelDownload(gameId: number) { + if (!this.torrentClient) this.spawn(); + + await this.rpc.post("/action", { + action: "cancel", + game_id: gameId, + }); + + this.downloadingGameId = -1; + } +} diff --git a/src/main/services/fifo.ts b/src/main/services/fifo.ts deleted file mode 100644 index 866232cc..00000000 --- a/src/main/services/fifo.ts +++ /dev/null @@ -1,38 +0,0 @@ -import path from "node:path"; -import net from "node:net"; -import crypto from "node:crypto"; -import os from "node:os"; - -export class FIFO { - public socket: null | net.Socket = null; - public socketPath = this.generateSocketFilename(); - - private generateSocketFilename() { - const hash = crypto.randomBytes(16).toString("hex"); - - if (process.platform === "win32") { - return "\\\\.\\pipe\\" + hash; - } - - return path.join(os.tmpdir(), hash); - } - - public write(data: any) { - if (!this.socket) return; - this.socket.write(Buffer.from(JSON.stringify(data))); - } - - public createPipe() { - return new Promise((resolve) => { - const server = net.createServer((socket) => { - this.socket = socket; - resolve(null); - }); - - server.listen(this.socketPath); - }); - } -} - -export const writePipe = new FIFO(); -export const readPipe = new FIFO(); diff --git a/src/main/services/index.ts b/src/main/services/index.ts index 42017774..255b3871 100644 --- a/src/main/services/index.ts +++ b/src/main/services/index.ts @@ -3,7 +3,7 @@ export * from "./steam"; export * from "./steam-250"; export * from "./steam-grid"; export * from "./window-manager"; -export * from "./download-manager"; +export * from "./download"; export * from "./how-long-to-beat"; export * from "./process-watcher"; export * from "./main-loop"; diff --git a/src/main/services/main-loop.ts b/src/main/services/main-loop.ts index dfae5eec..ca72707f 100644 --- a/src/main/services/main-loop.ts +++ b/src/main/services/main-loop.ts @@ -1,5 +1,5 @@ import { sleep } from "@main/helpers"; -import { DownloadManager } from "./download-manager"; +import { DownloadManager } from "./download"; import { watchProcesses } from "./process-watcher"; export const startMainLoop = async () => { diff --git a/src/renderer/src/components/bottom-panel/bottom-panel.tsx b/src/renderer/src/components/bottom-panel/bottom-panel.tsx index 008358c7..980c528e 100644 --- a/src/renderer/src/components/bottom-panel/bottom-panel.tsx +++ b/src/renderer/src/components/bottom-panel/bottom-panel.tsx @@ -32,6 +32,12 @@ export function BottomPanel() { const status = useMemo(() => { if (isGameDownloading) { + if (lastPacket?.isCheckingFiles) + return t("checking_files", { + title: lastPacket?.game.title, + percentage: progress, + }); + if (lastPacket?.isDownloadingMetadata) return t("downloading_metadata", { title: lastPacket?.game.title }); @@ -56,6 +62,7 @@ export function BottomPanel() { isGameDownloading, lastPacket?.game, lastPacket?.isDownloadingMetadata, + lastPacket?.isCheckingFiles, progress, eta, downloadSpeed, diff --git a/src/types/index.ts b/src/types/index.ts index ff3ad898..836e376c 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -149,6 +149,9 @@ export interface DownloadProgress { numPeers: number; numSeeds: number; isDownloadingMetadata: boolean; + isCheckingFiles: boolean; + progress: number; + gameId: number; game: LibraryGame; } diff --git a/torrent-client/fifo.py b/torrent-client/fifo.py deleted file mode 100644 index 730bc81c..00000000 --- a/torrent-client/fifo.py +++ /dev/null @@ -1,35 +0,0 @@ -import platform - -class Fifo: - socket_handle = None - - def __init__(self, path: str): - if platform.system() == "Windows": - import win32file - - self.socket_handle = win32file.CreateFile(path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, - 0, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) - else: - import socket - self.socket_handle = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket_handle.connect(path) - - def recv(self, bufSize: int): - if platform.system() == "Windows": - import win32file - - result, data = win32file.ReadFile(self.socket_handle, bufSize) - return data - else: - return self.socket_handle.recv(bufSize) - - def send_message(self, msg: str): - buffer = bytearray(1024 * 2) - buffer[:len(msg)] = bytes(msg, "utf-8") - - if platform.system() == "Windows": - import win32file - - win32file.WriteFile(self.socket_handle, buffer) - else: - self.socket_handle.send(buffer) diff --git a/torrent-client/main.py b/torrent-client/main.py index 982bc6ef..37f646e7 100644 --- a/torrent-client/main.py +++ b/torrent-client/main.py @@ -1,17 +1,16 @@ import libtorrent as lt import sys -from fifo import Fifo +from http.server import HTTPServer, BaseHTTPRequestHandler import json import threading import time torrent_port = sys.argv[1] -read_sock_path = sys.argv[2] -write_sock_path = sys.argv[3] +http_port = sys.argv[2] + +print(http_port) session = lt.session({'listen_interfaces': '0.0.0.0:{port}'.format(port=torrent_port)}) -read_fifo = Fifo(read_sock_path) -write_fifo = Fifo(write_sock_path) torrent_handles = {} downloading_game_id = -1 @@ -37,6 +36,7 @@ def pause_download(game_id: int): torrent_handle.pause() torrent_handle.unset_flags(lt.torrent_flags.auto_managed) downloading_game_id = -1 + Handler.current_status = None def cancel_download(game_id: int): global torrent_handles @@ -48,6 +48,7 @@ def cancel_download(game_id: int): session.remove_torrent(torrent_handle) torrent_handles[game_id] = None downloading_game_id =-1 + Handler.current_status = None def get_download_updates(): global torrent_handles @@ -63,7 +64,7 @@ def get_download_updates(): status = torrent_handle.status() info = torrent_handle.get_torrent_info() - write_fifo.send_message(json.dumps({ + Handler.current_status = { 'folderName': info.name() if info else "", 'fileSize': info.total_size() if info else 0, 'gameId': downloading_game_id, @@ -73,29 +74,49 @@ def get_download_updates(): 'numSeeds': status.num_seeds, 'status': status.state, 'bytesDownloaded': status.progress * info.total_size() if info else status.all_time_download, - })) + } if status.progress == 1: cancel_download(downloading_game_id) downloading_game_id = -1 + Handler.current_status = None time.sleep(0.5) -def listen_to_socket(): - while True: - msg = read_fifo.recv(1024 * 2) - payload = json.loads(msg.decode("utf-8")) - if payload['action'] == "start": - start_download(payload['game_id'], payload['magnet'], payload['save_path']) - elif payload['action'] == "pause": - pause_download(payload['game_id']) - elif payload['action'] == "cancel": - cancel_download(payload['game_id']) +class Handler(BaseHTTPRequestHandler): + current_status = None + + def do_GET(self): + if self.path == "/status": + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + + self.wfile.write(json.dumps(self.current_status).encode('utf-8')) + + def do_POST(self): + if self.path == "/action": + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + data = json.loads(post_data.decode('utf-8')) + + if data['action'] == 'start': + start_download(data['game_id'], data['magnet'], data['save_path']) + elif data['action'] == 'pause': + pause_download(data['game_id']) + elif data['action'] == 'cancel': + cancel_download(data['game_id']) + + self.send_response(200) + self.end_headers() + if __name__ == "__main__": p1 = threading.Thread(target=get_download_updates) - p2 = threading.Thread(target=listen_to_socket) + + httpd = HTTPServer(("", int(http_port)), Handler) + p2 = threading.Thread(target=httpd.serve_forever) p1.start() p2.start()