Report processor worker refactored to run under another user (nobody) and have its own mysql credentials.
This commit is contained in:
parent
c3edf42ada
commit
2ac89f3365
13 changed files with 159 additions and 204 deletions
|
@ -13,50 +13,71 @@ const privilegeHelpers = require('../lib/privilege-helpers');
|
|||
|
||||
let processes = {};
|
||||
|
||||
function spawnProcess(tid, executable, args, outputFile, cwd) {
|
||||
function spawnProcess(tid, executable, args, outFile, errFile, cwd, uid, gid) {
|
||||
|
||||
fs.open(outputFile, 'w', (err, outFd) => {
|
||||
fs.open(outFile, 'w', (err, outFd) => {
|
||||
if (err) {
|
||||
log.error('Executor', err);
|
||||
return;
|
||||
}
|
||||
|
||||
privilegeHelpers.ensureMailtrainOwner(outputFile, (err) => {
|
||||
fs.open(errFile, 'w', (err, errFd) => {
|
||||
if (err) {
|
||||
log.info('Executor', 'Cannot change owner of output file of process tid:%s.', tid)
|
||||
log.error('Executor', err);
|
||||
return;
|
||||
}
|
||||
|
||||
const options = {
|
||||
stdio: ['ignore', outFd, outFd, 'ipc'],
|
||||
cwd: cwd,
|
||||
env: {NODE_ENV: process.env.NODE_ENV}
|
||||
};
|
||||
privilegeHelpers.ensureMailtrainOwner(outFile, (err) => {
|
||||
if (err) {
|
||||
log.info('Executor', 'Cannot change owner of output file of process tid:%s.', tid)
|
||||
}
|
||||
|
||||
const child = fork(executable, args, options);
|
||||
const pid = child.pid;
|
||||
processes[tid] = child;
|
||||
|
||||
log.info('Executor', 'Process started with tid:%s pid:%s.', tid, pid);
|
||||
process.send({
|
||||
type: 'process-started',
|
||||
tid
|
||||
});
|
||||
|
||||
child.on('close', (code, signal) => {
|
||||
|
||||
delete processes[tid];
|
||||
log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s.', tid, pid, code, signal);
|
||||
|
||||
fs.close(outFd, (err) => {
|
||||
privilegeHelpers.ensureMailtrainOwner(errFile, (err) => {
|
||||
if (err) {
|
||||
log.error('Executor', err);
|
||||
log.info('Executor', 'Cannot change owner of error output file of process tid:%s.', tid)
|
||||
}
|
||||
|
||||
const options = {
|
||||
stdio: ['ignore', outFd, errFd, 'ipc'],
|
||||
cwd,
|
||||
env: {NODE_ENV: process.env.NODE_ENV},
|
||||
uid,
|
||||
gid
|
||||
};
|
||||
|
||||
const child = fork(executable, args, options);
|
||||
const pid = child.pid;
|
||||
processes[tid] = child;
|
||||
|
||||
log.info('Executor', 'Process started with tid:%s pid:%s.', tid, pid);
|
||||
process.send({
|
||||
type: 'process-finished',
|
||||
tid,
|
||||
code,
|
||||
signal
|
||||
type: 'process-started',
|
||||
tid
|
||||
});
|
||||
|
||||
child.on('close', (code, signal) => {
|
||||
|
||||
delete processes[tid];
|
||||
log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s.', tid, pid, code, signal);
|
||||
|
||||
fs.close(outFd, (err) => {
|
||||
if (err) {
|
||||
log.error('Executor', err);
|
||||
}
|
||||
|
||||
fs.close(errFd, (err) => {
|
||||
if (err) {
|
||||
log.error('Executor', err);
|
||||
}
|
||||
|
||||
process.send({
|
||||
type: 'process-finished',
|
||||
tid,
|
||||
code,
|
||||
signal
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -69,7 +90,9 @@ process.on('message', msg => {
|
|||
const type = msg.type;
|
||||
|
||||
if (type === 'start-report-processor-worker') {
|
||||
spawnProcess(msg.tid, path.join(__dirname, 'report-processor.js'), [msg.data.id], fileHelpers.getReportOutputFile(msg.data), path.join(__dirname, '..'));
|
||||
|
||||
const ids = privilegeHelpers.getConfigROUidGid();
|
||||
spawnProcess(msg.tid, path.join(__dirname, '..', 'workers', 'reports', 'report-processor.js'), [msg.data.id], fileHelpers.getReportContentFile(msg.data), fileHelpers.getReportOutputFile(msg.data), path.join(__dirname, '..', 'workers', 'reports'), ids.uid, ids.gid);
|
||||
|
||||
} else if (type === 'stop-process') {
|
||||
const child = processes[msg.tid];
|
||||
|
|
|
@ -1,191 +0,0 @@
|
|||
'use strict';
|
||||
|
||||
const reports = require('../lib/models/reports');
|
||||
const reportTemplates = require('../lib/models/report-templates');
|
||||
const lists = require('../lib/models/lists');
|
||||
const subscriptions = require('../lib/models/subscriptions');
|
||||
const campaigns = require('../lib/models/campaigns');
|
||||
const handlebars = require('handlebars');
|
||||
const handlebarsHelpers = require('../lib/handlebars-helpers');
|
||||
const _ = require('../lib/translate')._;
|
||||
const hbs = require('hbs');
|
||||
const vm = require('vm');
|
||||
const log = require('npmlog');
|
||||
const fs = require('fs');
|
||||
const fileHelpers = require('../lib/file-helpers');
|
||||
const path = require('path');
|
||||
const privilegeHelpers = require('../lib/privilege-helpers');
|
||||
|
||||
handlebarsHelpers.registerHelpers(handlebars);
|
||||
|
||||
let reportId = Number(process.argv[2]);
|
||||
let reportDir;
|
||||
|
||||
function resolveEntities(getter, ids, callback) {
|
||||
const idsRemaining = ids.slice();
|
||||
const resolved = [];
|
||||
|
||||
function doWork() {
|
||||
if (idsRemaining.length == 0) {
|
||||
return callback(null, resolved);
|
||||
}
|
||||
|
||||
getter(idsRemaining.shift(), (err, entity) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
resolved.push(entity);
|
||||
return doWork();
|
||||
});
|
||||
}
|
||||
|
||||
setImmediate(doWork);
|
||||
}
|
||||
|
||||
const userFieldTypeToGetter = {
|
||||
'campaign': (id, callback) => campaigns.get(id, false, callback),
|
||||
'list': lists.get
|
||||
};
|
||||
|
||||
function resolveUserFields(userFields, params, callback) {
|
||||
const userFieldsRemaining = userFields.slice();
|
||||
const resolved = {};
|
||||
|
||||
function doWork() {
|
||||
if (userFieldsRemaining.length == 0) {
|
||||
return callback(null, resolved);
|
||||
}
|
||||
|
||||
const spec = userFieldsRemaining.shift();
|
||||
const getter = userFieldTypeToGetter[spec.type];
|
||||
|
||||
if (getter) {
|
||||
return resolveEntities(getter, params[spec.id], (err, entities) => {
|
||||
if (spec.minOccurences == 1 && spec.maxOccurences == 1) {
|
||||
resolved[spec.id] = entities[0];
|
||||
} else {
|
||||
resolved[spec.id] = entities;
|
||||
}
|
||||
|
||||
doWork();
|
||||
});
|
||||
} else {
|
||||
return callback(new Error(_('Unknown user field type "' + spec.type + '".')));
|
||||
}
|
||||
}
|
||||
|
||||
setImmediate(doWork);
|
||||
}
|
||||
|
||||
function tearDownChrootDir(callback) {
|
||||
if (reportDir) {
|
||||
privilegeHelpers.tearDownChrootDir(reportDir, callback);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
||||
function doneSuccess() {
|
||||
tearDownChrootDir((err) => {
|
||||
if (err)
|
||||
process.exit(1)
|
||||
else
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
function doneFail() {
|
||||
tearDownChrootDir((err) => {
|
||||
process.exit(1)
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
reports.get(reportId, (err, report) => {
|
||||
if (err || !report) {
|
||||
log.error('reports', err && err.message || err || _('Could not find report with specified ID'));
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
|
||||
reportTemplates.get(report.reportTemplate, (err, reportTemplate) => {
|
||||
if (err) {
|
||||
log.error('reports', err && err.message || err || _('Could not find report template'));
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
|
||||
resolveUserFields(reportTemplate.userFieldsObject, report.paramsObject, (err, inputs) => {
|
||||
if (err) {
|
||||
log.error('reports', err.message || err);
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
|
||||
const campaignsProxy = {
|
||||
results: reports.getCampaignResults,
|
||||
list: campaigns.list,
|
||||
get: campaigns.get
|
||||
};
|
||||
|
||||
const subscriptionsProxy = {
|
||||
list: subscriptions.list
|
||||
};
|
||||
|
||||
const reportFile = fileHelpers.getReportContentFile(report);
|
||||
|
||||
const sandbox = {
|
||||
console,
|
||||
campaigns: campaignsProxy,
|
||||
subscriptions: subscriptionsProxy,
|
||||
inputs,
|
||||
|
||||
callback: (err, outputs) => {
|
||||
if (err) {
|
||||
log.error('reports', err.message || err);
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
|
||||
const hbsTmpl = handlebars.compile(reportTemplate.hbs);
|
||||
const reportText = hbsTmpl(outputs);
|
||||
|
||||
fs.writeFile(path.basename(reportFile), reportText, (err, reportContent) => {
|
||||
if (err) {
|
||||
log.error('reports', err && err.message || err || _('Could not find report with specified ID'));
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
|
||||
doneSuccess();
|
||||
return;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const script = new vm.Script(reportTemplate.js);
|
||||
|
||||
reportDir = fileHelpers.getReportDir(report);
|
||||
privilegeHelpers.setupChrootDir(reportDir, (err) => {
|
||||
if (err) {
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
|
||||
privilegeHelpers.chrootAndDropRootPrivileges(reportDir);
|
||||
|
||||
try {
|
||||
script.runInNewContext(sandbox, {displayErrors: true, timeout: 120000});
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
|
||||
doneFail();
|
||||
return;
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue