Reports halfway through
Datatable now correctly handles the situation when user is not logged in and access protected resources
This commit is contained in:
parent
aba42d94ac
commit
3f7b428546
28 changed files with 421 additions and 471 deletions
|
@ -1,7 +1,7 @@
|
|||
'use strict';
|
||||
|
||||
const log = require('npmlog');
|
||||
const reports = require('./models/reports');
|
||||
const reports = require('../models/reports');
|
||||
const executor = require('./executor');
|
||||
|
||||
let runningWorkersCount = 0;
|
||||
|
@ -11,12 +11,12 @@ let workers = {};
|
|||
|
||||
function startWorker(report) {
|
||||
|
||||
function onStarted(tid) {
|
||||
async function onStarted(tid) {
|
||||
log.info('ReportProcessor', 'Worker process for "%s" started with tid %s. Current worker count is %s.', report.name, tid, runningWorkersCount);
|
||||
workers[report.id] = tid;
|
||||
}
|
||||
|
||||
function onFinished(code, signal) {
|
||||
async function onFinished(code, signal) {
|
||||
runningWorkersCount--;
|
||||
log.info('ReportProcessor', 'Worker process for "%s" (tid %s) exited with code %s signal %s. Current worker count is %s.', report.name, workers[report.id], code, signal, runningWorkersCount);
|
||||
delete workers[report.id];
|
||||
|
@ -24,21 +24,20 @@ function startWorker(report) {
|
|||
const fields = {};
|
||||
if (code === 0) {
|
||||
fields.state = reports.ReportState.FINISHED;
|
||||
fields.lastRun = new Date();
|
||||
fields.last_run = new Date();
|
||||
} else {
|
||||
fields.state = reports.ReportState.FAILED;
|
||||
}
|
||||
|
||||
reports.updateFields(report.id, fields, err => {
|
||||
if (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
|
||||
setImmediate(startWorkers);
|
||||
});
|
||||
try {
|
||||
await reports.update(report.id, fields);
|
||||
setImmediate(tryStartWorkers);
|
||||
} catch (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
}
|
||||
|
||||
function onFailed(msg) {
|
||||
async function onFailed(msg) {
|
||||
runningWorkersCount--;
|
||||
log.error('ReportProcessor', 'Executing worker process for "%s" (tid %s) failed with message "%s". Current worker count is %s.', report.name, workers[report.id], msg, runningWorkersCount);
|
||||
delete workers[report.id];
|
||||
|
@ -47,13 +46,12 @@ function startWorker(report) {
|
|||
state: reports.ReportState.FAILED
|
||||
};
|
||||
|
||||
reports.updateFields(report.id, fields, err => {
|
||||
if (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
|
||||
setImmediate(startWorkers);
|
||||
});
|
||||
try {
|
||||
await reports.update(report.id, fields);
|
||||
setImmediate(tryStartWorkers);
|
||||
} catch (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
}
|
||||
|
||||
const reportData = {
|
||||
|
@ -65,83 +63,68 @@ function startWorker(report) {
|
|||
executor.start('report-processor-worker', reportData, onStarted, onFinished, onFailed);
|
||||
}
|
||||
|
||||
function startWorkers() {
|
||||
reports.listWithState(reports.ReportState.SCHEDULED, 0, maxWorkersCount - runningWorkersCount, (err, reportList) => {
|
||||
if (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
return;
|
||||
}
|
||||
let isStartingWorkers = false;
|
||||
|
||||
for (let report of reportList) {
|
||||
reports.updateFields(report.id, { state: reports.ReportState.PROCESSING }, err => {
|
||||
if (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
return;
|
||||
}
|
||||
async function tryStartWorkers() {
|
||||
|
||||
if (isStartingWorkers) {
|
||||
// Generally it is possible that this function is invoked simultaneously multiple times. This is to prevent it.
|
||||
return;
|
||||
}
|
||||
isStartingWorkers = true;
|
||||
|
||||
try {
|
||||
while (runningWorkersCount < maxWorkersCount) {
|
||||
log.info('ReportProcessor', 'Trying to start worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
|
||||
|
||||
const reportList = await reports.listByState(reports.ReportState.SCHEDULED, 1);
|
||||
|
||||
if (reportList.length > 0) {
|
||||
log.info('ReportProcessor', 'Starting worker');
|
||||
|
||||
const report = reportList[0];
|
||||
await report.updateFields(report.id, {state: reports.ReportState.PROCESSING});
|
||||
startWorker(report);
|
||||
});
|
||||
|
||||
} else {
|
||||
log.info('ReportProcessor', 'No more report to start a worker for');
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
|
||||
isStartingWorkers = false;
|
||||
}
|
||||
|
||||
module.exports.start = (reportId, callback) => {
|
||||
module.exports.start = async reportId => {
|
||||
if (!workers[reportId]) {
|
||||
log.info('ReportProcessor', 'Scheduling report id: %s', reportId);
|
||||
reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null}, err => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (runningWorkersCount < maxWorkersCount) {
|
||||
log.info('ReportProcessor', 'Starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
|
||||
|
||||
startWorkers();
|
||||
} else {
|
||||
log.info('ReportProcessor', 'Not starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
|
||||
}
|
||||
|
||||
callback(null);
|
||||
});
|
||||
await reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null});
|
||||
tryStartWorkers();
|
||||
} else {
|
||||
log.info('ReportProcessor', 'Worker for report id: %s is already running.', reportId);
|
||||
}
|
||||
};
|
||||
|
||||
module.exports.stop = (reportId, callback) => {
|
||||
module.exports.stop = async reportId => {
|
||||
const tid = workers[reportId];
|
||||
if (tid) {
|
||||
log.info('ReportProcessor', 'Killing worker for report id: %s', reportId);
|
||||
executor.stop(tid);
|
||||
reports.updateFields(reportId, { state: reports.ReportState.FAILED}, callback);
|
||||
|
||||
await reports.updateFields(reportId, { state: reports.ReportState.FAILED });
|
||||
} else {
|
||||
log.info('ReportProcessor', 'No running worker found for report id: %s', reportId);
|
||||
}
|
||||
};
|
||||
|
||||
module.exports.init = callback => {
|
||||
reports.listWithState(reports.ReportState.PROCESSING, 0, 0, (err, reportList) => {
|
||||
if (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
|
||||
function scheduleReport() {
|
||||
if (reportList.length > 0) {
|
||||
const report = reportList.shift();
|
||||
|
||||
reports.updateFields(report.id, { state: reports.ReportState.SCHEDULED}, err => {
|
||||
if (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
|
||||
scheduleReport();
|
||||
});
|
||||
}
|
||||
|
||||
startWorkers();
|
||||
return callback();
|
||||
}
|
||||
|
||||
scheduleReport();
|
||||
});
|
||||
module.exports.init = async () => {
|
||||
try {
|
||||
await reports.bulkChangeState(reports.ReportState.PROCESSING, reports.ReportState.SCHEDULED);
|
||||
} catch (err) {
|
||||
log.error('ReportProcessor', err);
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue