Spawn sender into separate process
This commit is contained in:
parent
37b0ac9aec
commit
88fe24a709
3 changed files with 39 additions and 6 deletions
|
@ -109,3 +109,8 @@ host="127.0.0.1"
|
||||||
# extra options for nodemailer
|
# extra options for nodemailer
|
||||||
[nodemailer]
|
[nodemailer]
|
||||||
#textEncoding="base64"
|
#textEncoding="base64"
|
||||||
|
|
||||||
|
[queue]
|
||||||
|
# How many parallel sender processes to spawn
|
||||||
|
# Do not use more than 1 for now as it would create race conditions
|
||||||
|
processes=1
|
||||||
|
|
35
index.js
35
index.js
|
@ -8,7 +8,7 @@ let config = require('config');
|
||||||
let log = require('npmlog');
|
let log = require('npmlog');
|
||||||
let app = require('./app');
|
let app = require('./app');
|
||||||
let http = require('http');
|
let http = require('http');
|
||||||
let sender = require('./services/sender');
|
let fork = require('child_process').fork;
|
||||||
let triggers = require('./services/triggers');
|
let triggers = require('./services/triggers');
|
||||||
let importer = require('./services/importer');
|
let importer = require('./services/importer');
|
||||||
let verpServer = require('./services/verp-server');
|
let verpServer = require('./services/verp-server');
|
||||||
|
@ -67,6 +67,37 @@ server.on('error', err => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
function spawnSenders(callback) {
|
||||||
|
let processes = Math.max(Number(config.queue.processes) || 1, 1);
|
||||||
|
let spawned = 0;
|
||||||
|
let returned = false;
|
||||||
|
|
||||||
|
let spawnSender = function () {
|
||||||
|
if (spawned >= processes) {
|
||||||
|
if (!returned) {
|
||||||
|
returned = true;
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let child = fork(__dirname + '/services/sender.js', []);
|
||||||
|
let pid = child.pid;
|
||||||
|
|
||||||
|
child.on('close', (code, signal) => {
|
||||||
|
spawned--;
|
||||||
|
log.error('Child', 'Sender process %s exited with %s', pid, code || signal);
|
||||||
|
// Respawn after 5 seconds
|
||||||
|
setTimeout(() => spawnSender(), 5 * 1000).unref();
|
||||||
|
});
|
||||||
|
|
||||||
|
spawned++;
|
||||||
|
setImmediate(spawnSender);
|
||||||
|
};
|
||||||
|
|
||||||
|
spawnSender();
|
||||||
|
}
|
||||||
|
|
||||||
server.on('listening', () => {
|
server.on('listening', () => {
|
||||||
let addr = server.address();
|
let addr = server.address();
|
||||||
let bind = typeof addr === 'string' ? 'pipe ' + addr : 'port ' + addr.port;
|
let bind = typeof addr === 'string' ? 'pipe ' + addr : 'port ' + addr.port;
|
||||||
|
@ -78,7 +109,7 @@ server.on('listening', () => {
|
||||||
tzupdate(() => {
|
tzupdate(() => {
|
||||||
importer(() => {
|
importer(() => {
|
||||||
triggers(() => {
|
triggers(() => {
|
||||||
sender(() => {
|
spawnSenders(() => {
|
||||||
feedcheck(() => {
|
feedcheck(() => {
|
||||||
postfixBounceServer(() => {
|
postfixBounceServer(() => {
|
||||||
log.info('Service', 'All services started');
|
log.info('Service', 'All services started');
|
||||||
|
|
|
@ -497,7 +497,4 @@ let sendLoop = () => {
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = callback => {
|
|
||||||
sendLoop();
|
sendLoop();
|
||||||
setImmediate(callback);
|
|
||||||
};
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue