65 lines
No EOL
2.4 KiB
JavaScript
65 lines
No EOL
2.4 KiB
JavaScript
'use strict';
|
|
|
|
const knex = require('./knex');
|
|
const fork = require('child_process').fork;
|
|
const log = require('./log');
|
|
const path = require('path');
|
|
const {ImportStatus, RunStatus} = require('../../shared/imports');
|
|
const {ListActivityType} = require('../../shared/activity-log');
|
|
const activityLog = require('./activity-log');
|
|
const bluebird = require('bluebird');
|
|
|
|
let messageTid = 0;
|
|
let importerProcess;
|
|
|
|
function spawn(callback) {
|
|
log.verbose('Importer', 'Spawning importer process');
|
|
|
|
knex.transaction(async tx => {
|
|
const updateStatus = async (fromStatus, toStatus) => {
|
|
for (const impt of await tx('imports').where('status', fromStatus).select(['id', 'list'])) {
|
|
await tx('imports').where('id', impt.id).update({status: toStatus});
|
|
await activityLog.logEntityActivity('list', ListActivityType.IMPORT_STATUS_CHANGE, impt.list, {importId: impt.id, importStatus: toStatus});
|
|
}
|
|
}
|
|
|
|
await updateStatus(ImportStatus.PREP_RUNNING, ImportStatus.PREP_SCHEDULED);
|
|
await updateStatus(ImportStatus.PREP_STOPPING, ImportStatus.PREP_FAILED);
|
|
await updateStatus(ImportStatus.RUN_RUNNING, ImportStatus.RUN_SCHEDULED);
|
|
await updateStatus(ImportStatus.RUN_STOPPING, ImportStatus.RUN_FAILED);
|
|
|
|
await tx('import_runs').where('status', RunStatus.RUNNING).update({status: RunStatus.SCHEDULED});
|
|
await tx('import_runs').where('status', RunStatus.STOPPING).update({status: RunStatus.FAILED});
|
|
|
|
}).then(() => {
|
|
importerProcess = fork(path.join(__dirname, '..', 'services', 'importer.js'), [], {
|
|
cwd: path.join(__dirname, '..'),
|
|
env: {NODE_ENV: process.env.NODE_ENV}
|
|
});
|
|
|
|
importerProcess.on('message', msg => {
|
|
if (msg) {
|
|
if (msg.type === 'importer-started') {
|
|
log.info('Importer', 'Importer process started');
|
|
return callback();
|
|
}
|
|
}
|
|
});
|
|
|
|
importerProcess.on('close', (code, signal) => {
|
|
log.error('Importer', 'Importer process exited with code %s signal %s', code, signal);
|
|
});
|
|
});
|
|
}
|
|
|
|
function scheduleCheck() {
|
|
importerProcess.send({
|
|
type: 'scheduleCheck',
|
|
tid: messageTid
|
|
});
|
|
|
|
messageTid++;
|
|
}
|
|
|
|
module.exports.spawn = bluebird.promisify(spawn);
|
|
module.exports.scheduleCheck = scheduleCheck; |