fix: removing repacks from worker threads to fix race condition

This commit is contained in:
Chubby Granny Chaser 2024-06-03 23:09:47 +01:00
parent 4559e23610
commit ea923d5082
No known key found for this signature in database
13 changed files with 71 additions and 105 deletions

View file

@ -2,8 +2,7 @@ import { getSteamAppAsset } from "@main/helpers";
import type { CatalogueEntry, GameShop } from "@types";
import { registerEvent } from "../register-event";
import { requestSteam250 } from "@main/services";
import { repacksWorker } from "@main/workers";
import { RepacksManager, requestSteam250 } from "@main/services";
import { formatName } from "@shared";
const resultSize = 12;
@ -19,10 +18,7 @@ const getCatalogue = async (_event: Electron.IpcMainInvokeEvent) => {
}
const { title, objectID } = trendingGames[i]!;
const repacks = await repacksWorker.run(
{ query: formatName(title) },
{ name: "search" }
);
const repacks = RepacksManager.search({ query: formatName(title) });
const catalogueEntry = {
objectID,

View file

@ -1,8 +1,9 @@
import type { CatalogueEntry } from "@types";
import { registerEvent } from "../register-event";
import { repacksWorker, steamGamesWorker } from "@main/workers";
import { steamGamesWorker } from "@main/workers";
import { convertSteamGameToCatalogueEntry } from "../helpers/search-games";
import { RepacksManager } from "@main/services";
const getGames = async (
_event: Electron.IpcMainInvokeEvent,
@ -14,11 +15,8 @@ const getGames = async (
{ name: "list" }
);
const entries = await repacksWorker.run(
steamGames.map((game) => convertSteamGameToCatalogueEntry(game)),
{
name: "findRepacksForCatalogueEntries",
}
const entries = RepacksManager.findRepacksForCatalogueEntries(
steamGames.map((game) => convertSteamGameToCatalogueEntry(game))
);
return {

View file

@ -1,9 +1,9 @@
import { RepacksManager } from "@main/services";
import { registerEvent } from "../register-event";
import { repacksWorker } from "@main/workers";
const searchGameRepacks = (
_event: Electron.IpcMainInvokeEvent,
query: string
) => repacksWorker.run({ query }, { name: "search" });
) => RepacksManager.search({ query });
registerEvent("searchGameRepacks", searchGameRepacks);

View file

@ -3,9 +3,8 @@ import { dataSource } from "@main/data-source";
import { DownloadSource } from "@main/entity";
import axios from "axios";
import { downloadSourceSchema } from "../helpers/validators";
import { repackRepository } from "@main/repository";
import { repacksWorker } from "@main/workers";
import { insertDownloadsFromSource } from "@main/helpers";
import { RepacksManager } from "@main/services";
const addDownloadSource = async (
_event: Electron.IpcMainInvokeEvent,
@ -31,15 +30,7 @@ const addDownloadSource = async (
}
);
repackRepository
.find({
order: {
createdAt: "DESC",
},
})
.then((repacks) => {
repacksWorker.run(repacks, { name: "setRepacks" });
});
await RepacksManager.updateRepacks();
return downloadSource;
};

View file

@ -1,22 +1,13 @@
import { downloadSourceRepository, repackRepository } from "@main/repository";
import { downloadSourceRepository } from "@main/repository";
import { registerEvent } from "../register-event";
import { repacksWorker } from "@main/workers";
import { RepacksManager } from "@main/services";
const removeDownloadSource = async (
_event: Electron.IpcMainInvokeEvent,
id: number
) => {
await downloadSourceRepository.delete(id);
repackRepository
.find({
order: {
createdAt: "DESC",
},
})
.then((repacks) => {
repacksWorker.run(repacks, { name: "setRepacks" });
});
await RepacksManager.updateRepacks();
};
registerEvent("removeDownloadSource", removeDownloadSource);

View file

@ -2,8 +2,7 @@ import { registerEvent } from "../register-event";
import axios from "axios";
import { downloadSourceRepository } from "@main/repository";
import { downloadSourceSchema } from "../helpers/validators";
import { repacksWorker } from "@main/workers";
import { GameRepack } from "@types";
import { RepacksManager } from "@main/services";
const validateDownloadSource = async (
_event: Electron.IpcMainInvokeEvent,
@ -20,9 +19,7 @@ const validateDownloadSource = async (
if (existingSource)
throw new Error("Source with the same url already exists");
const repacks = (await repacksWorker.run(undefined, {
name: "list",
})) as GameRepack[];
const repacks = RepacksManager.repacks;
const existingUris = source.downloads
.flatMap((download) => download.uris)

View file

@ -4,7 +4,8 @@ import flexSearch from "flexsearch";
import type { GameShop, CatalogueEntry, SteamGame } from "@types";
import { getSteamAppAsset } from "@main/helpers";
import { repacksWorker, steamGamesWorker } from "@main/workers";
import { steamGamesWorker } from "@main/workers";
import { RepacksManager } from "@main/services";
export interface SearchGamesArgs {
query?: string;
@ -29,11 +30,8 @@ export const searchSteamGames = async (
name: "search",
})) as SteamGame[];
const result = await repacksWorker.run(
steamGames.map((game) => convertSteamGameToCatalogueEntry(game)),
{
name: "findRepacksForCatalogueEntries",
}
const result = RepacksManager.findRepacksForCatalogueEntries(
steamGames.map((game) => convertSteamGameToCatalogueEntry(game))
);
return orderBy(

View file

@ -2,6 +2,7 @@ import { dataSource } from "@main/data-source";
import { DownloadSource, Repack } from "@main/entity";
import { downloadSourceSchema } from "@main/events/helpers/validators";
import { downloadSourceRepository } from "@main/repository";
import { RepacksManager } from "@main/services";
import { downloadSourceWorker } from "@main/workers";
import { chunk } from "lodash-es";
import type { EntityManager } from "typeorm";
@ -65,5 +66,7 @@ export const fetchDownloadSourcesAndUpdate = async () => {
result.downloads
);
}
await RepacksManager.updateRepacks();
});
};

View file

@ -1,18 +1,15 @@
import { DownloadManager, startMainLoop } from "./services";
import {
gameRepository,
repackRepository,
userPreferencesRepository,
} from "./repository";
import { DownloadManager, RepacksManager, startMainLoop } from "./services";
import { gameRepository, userPreferencesRepository } from "./repository";
import { UserPreferences } from "./entity";
import { RealDebridClient } from "./services/real-debrid";
import { Not } from "typeorm";
import { repacksWorker } from "./workers";
import { fetchDownloadSourcesAndUpdate } from "./helpers";
startMainLoop();
const loadState = async (userPreferences: UserPreferences | null) => {
await RepacksManager.updateRepacks();
import("./events");
if (userPreferences?.realDebridApiToken)
@ -28,14 +25,6 @@ const loadState = async (userPreferences: UserPreferences | null) => {
if (game) DownloadManager.startDownload(game);
const repacks = await repackRepository.find({
order: {
createdAt: "DESC",
},
});
repacksWorker.run(repacks, { name: "setRepacks" });
fetchDownloadSourcesAndUpdate();
};

View file

@ -7,3 +7,4 @@ export * from "./download-manager";
export * from "./how-long-to-beat";
export * from "./process-watcher";
export * from "./main-loop";
export * from "./repacks-manager";

View file

@ -0,0 +1,44 @@
import { repackRepository } from "@main/repository";
import { formatName } from "@shared";
import { CatalogueEntry, GameRepack } from "@types";
import flexSearch from "flexsearch";
export class RepacksManager {
public static repacks: GameRepack[] = [];
private static repacksIndex = new flexSearch.Index();
public static async updateRepacks() {
this.repacks = await repackRepository.find({
order: {
createdAt: "DESC",
},
});
for (let i = 0; i < this.repacks.length; i++) {
this.repacksIndex.remove(i);
}
this.repacksIndex = new flexSearch.Index();
for (let i = 0; i < this.repacks.length; i++) {
const repack = this.repacks[i];
const formattedTitle = formatName(repack.title);
this.repacksIndex.add(i, formattedTitle);
}
}
public static search(options: flexSearch.SearchOptions) {
return this.repacksIndex
.search({ ...options, query: formatName(options.query ?? "") })
.map((index) => this.repacks[index]);
}
public static findRepacksForCatalogueEntries(entries: CatalogueEntry[]) {
return entries.map((entry) => {
const repacks = this.search({ query: formatName(entry.title) });
return { ...entry, repacks };
});
}
}

View file

@ -1,6 +1,5 @@
import path from "node:path";
import steamGamesWorkerPath from "./steam-games.worker?modulePath";
import repacksWorkerPath from "./repacks.worker?modulePath";
import downloadSourceWorkerPath from "./download-source.worker?modulePath";
import Piscina from "piscina";
@ -14,10 +13,6 @@ export const steamGamesWorker = new Piscina({
},
});
export const repacksWorker = new Piscina({
filename: repacksWorkerPath,
});
export const downloadSourceWorker = new Piscina({
filename: downloadSourceWorkerPath,
});

View file

@ -1,37 +0,0 @@
import { formatName } from "@shared";
import { CatalogueEntry, GameRepack } from "@types";
import flexSearch from "flexsearch";
const repacksIndex = new flexSearch.Index();
const state: { repacks: GameRepack[] } = { repacks: [] };
export const setRepacks = (repacks: GameRepack[]) => {
for (let i = 0; i < state.repacks.length; i++) {
repacksIndex.remove(i);
}
state.repacks = repacks;
for (let i = 0; i < repacks.length; i++) {
const repack = repacks[i];
const formattedTitle = formatName(repack.title);
repacksIndex.add(i, formattedTitle);
}
};
export const search = (options: flexSearch.SearchOptions) =>
repacksIndex
.search({ ...options, query: formatName(options.query ?? "") })
.map((index) => state.repacks[index]);
export const list = () => state.repacks;
export const findRepacksForCatalogueEntries = (entries: CatalogueEntry[]) => {
return entries.map((entry) => {
const repacks = search({ query: formatName(entry.title) });
return { ...entry, repacks };
});
};