mailtrain/services/report-processor.js
Tomas Bures 8237dd5d77 The "Reports" feature seems functional.
Some small refactoring (column widths) of rendering tables in Lists, Templates, and Campaigns so that it is the same as Reports.
2017-04-20 19:42:01 -04:00

154 lines
4.8 KiB
JavaScript

'use strict';
const log = require('npmlog');
const db = require('../lib/db');
const reports = require('../lib/models/reports');
const _ = require('../lib/translate')._;
const path = require('path');
const tools = require('../lib/tools');
const fs = require('fs');
const fork = require('child_process').fork;
let runningWorkersCount = 0;
let maxWorkersCount = 1;
let workers = {};
function getFileName(report, suffix) {
return path.join(__dirname, '..', 'protected', 'reports', report.id + '-' + tools.nameToFileName(report.name) + '.' + suffix);
}
module.exports.getFileName = getFileName;
function spawnWorker(report) {
fs.open(getFileName(report, 'output'), 'w', (err, outFd) => {
if (err) {
log.error('ReportProcessor', err);
return;
}
runningWorkersCount++;
const options = {
stdio: ['ignore', outFd, outFd, 'ipc'],
cwd: path.join(__dirname, '..')
};
let child = fork(path.join(__dirname, 'report-processor-worker.js'), [report.id], options);
let pid = child.pid;
workers[report.id] = child;
log.info('ReportProcessor', 'Worker process for "%s" started with pid %s. Current worker count is %s.', report.name, pid, runningWorkersCount);
child.on('close', (code, signal) => {
runningWorkersCount--;
delete workers[report.id];
log.info('ReportProcessor', 'Worker process for "%s" (pid %s) exited with code %s signal %s. Current worker count is %s.', report.name, pid, code, signal, runningWorkersCount);
fs.close(outFd, (err) => {
if (err) {
log.error('ReportProcessor', err);
}
const fields = {};
if (code ===0 ) {
fields.state = reports.ReportState.FINISHED;
fields.lastRun = new Date();
} else {
fields.state = reports.ReportState.FAILED;
}
reports.updateFields(report.id, fields, (err) => {
if (err) {
log.error('ReportProcessor', err);
}
setImmediate(worker);
});
});
});
});
};
function worker() {
reports.listWithState(reports.ReportState.SCHEDULED, 0, maxWorkersCount - runningWorkersCount, (err, reportList) => {
if (err) {
log.error('ReportProcessor', err);
return;
}
for (let report of reportList) {
reports.updateFields(report.id, { state: reports.ReportState.PROCESSING }, (err) => {
if (err) {
log.error('ReportProcessor', err);
return;
}
spawnWorker(report);
});
}
});
}
module.exports.start = (reportId, callback) => {
if (!workers[reportId]) {
log.info('ReportProcessor', 'Scheduling report id: %s', reportId);
reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null}, (err) => {
if (err) {
return callback(err);
}
if (runningWorkersCount < maxWorkersCount) {
log.info('ReportProcessor', 'Starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
worker();
} else {
log.info('ReportProcessor', 'Not starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
}
callback(null);
});
} else {
log.info('ReportProcessor', 'Worker for report id: %s is already running.', reportId);
}
};
module.exports.stop = (reportId, callback) => {
const child = workers[reportId];
if (child) {
log.info('ReportProcessor', 'Killing worker for report id: %s', reportId);
child.kill();
reports.updateFields(reportId, { state: reports.ReportState.FAILED}, callback);
} else {
log.info('ReportProcessor', 'No running worker found for report id: %s', reportId);
}
};
module.exports.init = (callback) => {
reports.listWithState(reports.ReportState.PROCESSING, 0, 0, (err, reportList) => {
if (err) {
log.error('ReportProcessor', err);
}
function scheduleReport() {
if (reportList.length > 0) {
const report = reportList.shift();
reports.updateFields(report.id, { state: reports.ReportState.SCHEDULED}, (err) => {
if (err) {
log.error('ReportProcessor', err);
}
scheduleReport();
});
}
worker();
callback();
}
scheduleReport();
});
};