2017-04-25 22:49:31 +00:00
'use strict' ;
2018-09-27 19:32:35 +00:00
const log = require ( './log' ) ;
2017-07-09 21:16:47 +00:00
const reports = require ( '../models/reports' ) ;
2017-04-25 22:49:31 +00:00
const executor = require ( './executor' ) ;
2018-11-18 14:38:52 +00:00
const contextHelpers = require ( './context-helpers' ) ;
2017-04-25 22:49:31 +00:00
let runningWorkersCount = 0 ;
let maxWorkersCount = 1 ;
2018-04-29 16:13:40 +00:00
const workers = { } ;
2017-04-25 22:49:31 +00:00
function startWorker ( report ) {
2017-07-09 21:16:47 +00:00
async function onStarted ( tid ) {
2017-04-25 22:49:31 +00:00
log . info ( 'ReportProcessor' , 'Worker process for "%s" started with tid %s. Current worker count is %s.' , report . name , tid , runningWorkersCount ) ;
workers [ report . id ] = tid ;
}
2017-07-09 21:16:47 +00:00
async function onFinished ( code , signal ) {
2017-04-25 22:49:31 +00:00
runningWorkersCount -- ;
log . info ( 'ReportProcessor' , 'Worker process for "%s" (tid %s) exited with code %s signal %s. Current worker count is %s.' , report . name , workers [ report . id ] , code , signal , runningWorkersCount ) ;
delete workers [ report . id ] ;
const fields = { } ;
if ( code === 0 ) {
fields . state = reports . ReportState . FINISHED ;
2017-07-09 21:16:47 +00:00
fields . last _run = new Date ( ) ;
2017-04-25 22:49:31 +00:00
} else {
fields . state = reports . ReportState . FAILED ;
}
2017-07-09 21:16:47 +00:00
try {
2017-07-13 11:27:03 +00:00
await reports . updateFields ( report . id , fields ) ;
2017-07-09 21:16:47 +00:00
setImmediate ( tryStartWorkers ) ;
} catch ( err ) {
log . error ( 'ReportProcessor' , err ) ;
}
2017-04-25 22:49:31 +00:00
}
2017-07-09 21:16:47 +00:00
async function onFailed ( msg ) {
2017-04-27 22:14:15 +00:00
runningWorkersCount -- ;
log . error ( 'ReportProcessor' , 'Executing worker process for "%s" (tid %s) failed with message "%s". Current worker count is %s.' , report . name , workers [ report . id ] , msg , runningWorkersCount ) ;
delete workers [ report . id ] ;
const fields = {
state : reports . ReportState . FAILED
} ;
2017-07-09 21:16:47 +00:00
try {
2017-07-13 11:27:03 +00:00
await reports . updateFields ( report . id , fields ) ;
2017-07-09 21:16:47 +00:00
setImmediate ( tryStartWorkers ) ;
} catch ( err ) {
log . error ( 'ReportProcessor' , err ) ;
}
2017-04-27 22:14:15 +00:00
}
2017-04-25 22:49:31 +00:00
const reportData = {
id : report . id ,
name : report . name
} ;
runningWorkersCount ++ ;
2017-04-27 22:14:15 +00:00
executor . start ( 'report-processor-worker' , reportData , onStarted , onFinished , onFailed ) ;
2017-04-25 22:49:31 +00:00
}
2017-07-09 21:16:47 +00:00
let isStartingWorkers = false ;
2017-04-25 22:49:31 +00:00
2017-07-09 21:16:47 +00:00
async function tryStartWorkers ( ) {
2017-04-25 22:49:31 +00:00
2017-07-09 21:16:47 +00:00
if ( isStartingWorkers ) {
// Generally it is possible that this function is invoked simultaneously multiple times. This is to prevent it.
return ;
}
isStartingWorkers = true ;
2017-04-25 22:49:31 +00:00
2017-07-09 21:16:47 +00:00
try {
while ( runningWorkersCount < maxWorkersCount ) {
log . info ( 'ReportProcessor' , 'Trying to start worker because runningWorkersCount=%s maxWorkersCount=%s' , runningWorkersCount , maxWorkersCount ) ;
const reportList = await reports . listByState ( reports . ReportState . SCHEDULED , 1 ) ;
if ( reportList . length > 0 ) {
log . info ( 'ReportProcessor' , 'Starting worker' ) ;
2017-04-25 22:49:31 +00:00
2017-07-09 21:16:47 +00:00
const report = reportList [ 0 ] ;
2017-07-13 11:27:03 +00:00
await reports . updateFields ( report . id , { state : reports . ReportState . PROCESSING } ) ;
2017-07-09 21:16:47 +00:00
startWorker ( report ) ;
2017-04-25 22:49:31 +00:00
} else {
2018-11-17 01:54:23 +00:00
log . info ( 'ReportProcessor' , 'No more reports to start a worker for' ) ;
2017-07-09 21:16:47 +00:00
break ;
2017-04-25 22:49:31 +00:00
}
2017-07-09 21:16:47 +00:00
}
} catch ( err ) {
log . error ( 'ReportProcessor' , err ) ;
}
isStartingWorkers = false ;
}
2017-04-25 22:49:31 +00:00
2017-07-26 19:42:05 +00:00
module . exports . start = async ( reportId ) => {
2017-07-09 21:16:47 +00:00
if ( ! workers [ reportId ] ) {
log . info ( 'ReportProcessor' , 'Scheduling report id: %s' , reportId ) ;
2017-07-13 11:27:03 +00:00
await reports . updateFields ( reportId , { state : reports . ReportState . SCHEDULED , last _run : null } ) ;
2018-08-02 10:19:27 +00:00
await tryStartWorkers ( ) ;
2017-04-25 22:49:31 +00:00
} else {
log . info ( 'ReportProcessor' , 'Worker for report id: %s is already running.' , reportId ) ;
}
} ;
2017-07-09 21:16:47 +00:00
module . exports . stop = async reportId => {
2017-04-25 22:49:31 +00:00
const tid = workers [ reportId ] ;
if ( tid ) {
log . info ( 'ReportProcessor' , 'Killing worker for report id: %s' , reportId ) ;
executor . stop ( tid ) ;
2017-07-09 21:16:47 +00:00
await reports . updateFields ( reportId , { state : reports . ReportState . FAILED } ) ;
2017-04-25 22:49:31 +00:00
} else {
log . info ( 'ReportProcessor' , 'No running worker found for report id: %s' , reportId ) ;
}
} ;
2017-07-09 21:16:47 +00:00
module . exports . init = async ( ) => {
try {
await reports . bulkChangeState ( reports . ReportState . PROCESSING , reports . ReportState . SCHEDULED ) ;
2018-08-02 10:19:27 +00:00
await tryStartWorkers ( ) ;
2017-07-09 21:16:47 +00:00
} catch ( err ) {
log . error ( 'ReportProcessor' , err ) ;
}
2017-04-25 22:49:31 +00:00
} ;