Work in progress on securing reports.

This commit is contained in:
Tomas Bures 2017-04-25 22:49:31 +00:00
parent 3072632d8d
commit 418dba7b9f
14 changed files with 709 additions and 331 deletions

7
app.js
View file

@ -213,8 +213,11 @@ app.use('/api', api);
app.use('/editorapi', editorapi);
app.use('/grapejs', grapejs);
app.use('/mosaico', mosaico);
app.use('/reports', reports);
app.use('/report-templates', reportsTemplates);
if (config.reports && config.reports.enabled === true) {
app.use('/reports', reports);
app.use('/report-templates', reportsTemplates);
}
// catch 404 and forward to error handler
app.use((req, res, next) => {

View file

@ -74,6 +74,11 @@ postsize="2MB"
host="localhost"
user="mailtrain"
password="mailtrain"
# If more security is desired when running reports (which use user-defined JS scripts located in DB),
# one can specify a DB user with read-only permissions. If these are not specified, Mailtrain uses the
# regular DB user (which has also write permissions).
# userRO="mailtrain-ro"
# passwordRO="mailtrain-ro"
database="mailtrain"
# Some installations, eg. MAMP can use a different port (8889)
# MAMP users should also turn on "Allow network access to MySQL" otherwise MySQL might not be accessible
@ -150,3 +155,18 @@ templates=[["versafix-1", "Versafix One"]]
[grapejs]
# Installed templates
templates=[["demo", "Demo Template"]]
[reports]
# The whole reporting functionality can be disabled below if the they are not needed and the DB cannot be
# properly protected.
# Reports rely on custom user defined Javascript snippets defined in the report template. The snippets are run on the
# server when generating a report. As these snippets are stored in the DB, they pose a security risk because they can
# help gaining access to the server if the DB cannot
# be properly protected (e.g. if it is shared with another application with security weaknesses).
# Mailtrain mitigates this problem by running the custom Javascript snippets in a chrooted environment and under a
# DB user that cannot modify the database (see userRO in [mysql] above). However the chrooted environment is available
# only if Mailtrain is started as root. The chrooted environment still does not prevent the custom JS script in
# performing network operations and in generating XSS attacks as part of the report.
# The bottom line is that if people who are creating report templates or have write access to the DB cannot be trusted,
# then it's safer to switch off the reporting functionality below.
enabled=false

View file

@ -4,21 +4,23 @@
* Module dependencies.
*/
let config = require('config');
let log = require('npmlog');
let app = require('./app');
let http = require('http');
let fork = require('child_process').fork;
let triggers = require('./services/triggers');
let importer = require('./services/importer');
let verpServer = require('./services/verp-server');
let testServer = require('./services/test-server');
let postfixBounceServer = require('./services/postfix-bounce-server');
let tzupdate = require('./services/tzupdate');
let feedcheck = require('./services/feedcheck');
let dbcheck = require('./lib/dbcheck');
let tools = require('./lib/tools');
let reportProcessor = require('./services/report-processor');
const config = require('config');
const log = require('npmlog');
const app = require('./app');
const http = require('http');
const fork = require('child_process').fork;
const triggers = require('./services/triggers');
const importer = require('./services/importer');
const verpServer = require('./services/verp-server');
const testServer = require('./services/test-server');
const postfixBounceServer = require('./services/postfix-bounce-server');
const tzupdate = require('./services/tzupdate');
const feedcheck = require('./services/feedcheck');
const dbcheck = require('./lib/dbcheck');
const tools = require('./lib/tools');
const reportProcessor = require('./lib/report-processor');
const executor = require('./lib/executor');
const privilegeHelpers = require('./lib/privilege-helpers');
let port = config.www.port;
let host = config.www.host;
@ -113,32 +115,21 @@ server.on('listening', () => {
log.info('Express', 'WWW server listening on %s', bind);
// start additional services
testServer(() => {
verpServer(() => {
tzupdate(() => {
importer(() => {
triggers(() => {
spawnSenders(() => {
feedcheck(() => {
postfixBounceServer(() => {
reportProcessor.init(() => {
log.info('Service', 'All services started');
if (config.group) {
try {
process.setgid(config.group);
log.info('Service', 'Changed group to "%s" (%s)', config.group, process.getgid());
} catch (E) {
log.info('Service', 'Failed to change group to "%s" (%s)', config.group, E.message);
}
}
if (config.user) {
try {
process.setuid(config.user);
log.info('Service', 'Changed user to "%s" (%s)', config.user, process.getuid());
} catch (E) {
log.info('Service', 'Failed to change user to "%s" (%s)', config.user, E.message);
}
}
function startNextServices() {
testServer(() => {
verpServer(() => {
privilegeHelpers.dropRootPrivileges();
tzupdate(() => {
importer(() => {
triggers(() => {
spawnSenders(() => {
feedcheck(() => {
postfixBounceServer(() => {
reportProcessor.init(() => {
log.info('Service', 'All services started');
});
});
});
});
@ -147,5 +138,11 @@ server.on('listening', () => {
});
});
});
});
}
if (config.reports && config.reports.enabled === true) {
executor.spawn(() => startNextServices);
} else {
startNextServices();
}
});

74
lib/executor.js Normal file
View file

@ -0,0 +1,74 @@
'use strict';
const fork = require('child_process').fork;
const log = require('npmlog');
const path = require('path');
const requestCallbacks = {};
let messageTid = 0;
let executorProcess;
module.exports = {
spawn,
start,
stop
};
function spawn(callback) {
log.info('Executor', 'Spawning executor process.');
executorProcess = fork(path.join(__dirname, '..', 'services', 'executor.js'), [], {
cwd: path.join(__dirname, '..'),
env: {NODE_ENV: process.env.NODE_ENV}
});
executorProcess.on('message', msg => {
if (msg) {
if (msg.type === 'process-started') {
let requestCallback = requestCallbacks[msg.tid];
if (requestCallback && requestCallback.startedCallback) {
requestCallback.startedCallback(msg.tid);
}
} else if (msg.type === 'process-finished') {
let requestCallback = requestCallbacks[msg.tid];
if (requestCallback && requestCallback.startedCallback) {
requestCallback.finishedCallback(msg.code, msg.signal);
}
delete requestCallbacks[msg.tid];
} else if (msg.type === 'executor-started') {
log.info('Executor', 'Executor process started.');
return callback();
}
}
});
executorProcess.on('close', (code, signal) => {
log.info('Executor', 'Executor process exited with code %s signal %s.', code, signal);
});
}
function start(type, data, startedCallback, finishedCallback) {
requestCallbacks[messageTid] = {
startedCallback,
finishedCallback
};
executorProcess.send({
type: 'start-' + type,
data,
tid: messageTid
});
messageTid++;
}
function stop(tid) {
executorProcess.send({
type: 'stop-process',
tid
});
}

33
lib/file-helpers.js Normal file
View file

@ -0,0 +1,33 @@
'use strict';
const path = require('path');
function nameToFileName(name) {
return name.
trim().
toLowerCase().
replace(/[ .+/]/g, '-').
replace(/[^a-z0-9\-_]/gi, '').
replace(/--*/g, '-');
}
function getReportDir(report) {
return path.join(__dirname, '..', 'protected', 'reports', report.id + '-' + nameToFileName(report.name));
}
function getReportContentFile(report) {
return path.join(getReportDir(report), 'report');
}
function getReportOutputFile(report) {
return getReportDir(report) + '.output';
}
module.exports = {
getReportContentFile,
getReportDir,
getReportOutputFile,
nameToFileName
};

131
lib/privilege-helpers.js Normal file
View file

@ -0,0 +1,131 @@
'use strict';
const log = require('npmlog');
const config = require('config');
const path = require('path');
const promise = require('bluebird');
const fsExtra = promise.promisifyAll(require('fs-extra'));
const fs = promise.promisifyAll(require('fs'));
const walk = require('walk');
const tryRequire = require('try-require');
const posix = tryRequire('posix');
function ensureMailtrainOwner(file, callback) {
try {
const uid = config.user ? posix.getpwnam(config.user).uid : 0;
const gid = config.group ? posix.getgrnam(config.group).gid : 0;
fs.chown(file, uid, gid, callback);
} catch (err) {
return callback(err);
}
}
function ensureMailtrainOwnerRecursive(dir, callback) {
try {
const uid = config.user ? posix.getpwnam(config.user).uid : 0;
const gid = config.group ? posix.getgrnam(config.group).gid : 0;
fs.chown(dir, uid, gid, err => {
if (err) {
return callback(err);
}
walk.walk(dir)
.on('node', (root, stat, next) => {
fs.chown(path.join(root, stat.name), uid, gid, next);
})
.on('end', callback);
});
} catch (err) {
return callback(err);
}
}
const ensureMailtrainOwnerRecursiveAsync = promise.promisify(ensureMailtrainOwnerRecursive);
function dropRootPrivileges() {
if (config.group) {
try {
process.setgid(config.group);
log.info('PrivilegeHelpers', 'Changed group to "%s" (%s)', config.group, process.getgid());
} catch (E) {
log.info('PrivilegeHelpers', 'Failed to change group to "%s" (%s)', config.group, E.message);
}
}
if (config.user) {
try {
process.setuid(config.user);
log.info('PrivilegeHelpers', 'Changed user to "%s" (%s)', config.user, process.getuid());
} catch (E) {
log.info('PrivilegeHelpers', 'Failed to change user to "%s" (%s)', config.user, E.message);
}
}
}
function setupChrootDir(newRoot, callback) {
try {
fsExtra.emptyDirAsync(newRoot)
.then(() => fsExtra.ensureDirAsync(path.join(newRoot, 'etc')))
.then(() => fsExtra.copyAsync('/etc/hosts', path.join(newRoot, 'etc', 'hosts')))
.then(() => ensureMailtrainOwnerRecursiveAsync(newRoot))
.then(() => {
log.info('PrivilegeHelpers', 'Chroot directory "%s" set up', newRoot);
callback();
})
.catch(err => {
log.info('PrivilegeHelpers', 'Failed to setup chroot directory "%s"', newRoot);
callback(err);
});
} catch(err) {
log.info('PrivilegeHelpers', 'Failed to setup chroot directory "%s"', newRoot);
}
}
function tearDownChrootDir(root, callback) {
if (posix) {
fsExtra.removeAsync(path.join('/', 'etc'))
.then(() => {
log.info('PrivilegeHelpers', 'Chroot directory "%s" torn down', root);
callback();
})
.catch(err => {
log.info('PrivilegeHelpers', 'Failed to tear down chroot directory "%s"', root);
callback(err);
});
}
}
function chrootAndDropRootPrivileges(newRoot) {
try {
const uid = config.user ? posix.getpwnam(config.user).uid : 0;
const gid = config.group ? posix.getgrnam(config.group).gid : 0;
posix.chroot(newRoot);
process.chdir('/');
process.setgid(gid);
process.setuid(uid);
log.info('PrivilegeHelpers', 'Changed root to "%s" and privileges to %s.%s', newRoot, uid, gid);
} catch(err) {
log.info('PrivilegeHelpers', 'Failed to change root to "%s" and set privileges', newRoot);
}
}
module.exports = {
dropRootPrivileges,
chrootAndDropRootPrivileges,
setupChrootDir,
tearDownChrootDir,
ensureMailtrainOwner,
ensureMailtrainOwnerRecursive
};

129
lib/report-processor.js Normal file
View file

@ -0,0 +1,129 @@
'use strict';
const log = require('npmlog');
const reports = require('./models/reports');
const executor = require('./executor');
let runningWorkersCount = 0;
let maxWorkersCount = 1;
let workers = {};
function startWorker(report) {
function onStarted(tid) {
log.info('ReportProcessor', 'Worker process for "%s" started with tid %s. Current worker count is %s.', report.name, tid, runningWorkersCount);
workers[report.id] = tid;
}
function onFinished(code, signal) {
runningWorkersCount--;
log.info('ReportProcessor', 'Worker process for "%s" (tid %s) exited with code %s signal %s. Current worker count is %s.', report.name, workers[report.id], code, signal, runningWorkersCount);
delete workers[report.id];
const fields = {};
if (code === 0) {
fields.state = reports.ReportState.FINISHED;
fields.lastRun = new Date();
} else {
fields.state = reports.ReportState.FAILED;
}
reports.updateFields(report.id, fields, err => {
if (err) {
log.error('ReportProcessor', err);
}
setImmediate(startWorkers);
});
}
const reportData = {
id: report.id,
name: report.name
};
runningWorkersCount++;
executor.start('report-processor-worker', reportData, onStarted, onFinished);
}
function startWorkers() {
reports.listWithState(reports.ReportState.SCHEDULED, 0, maxWorkersCount - runningWorkersCount, (err, reportList) => {
if (err) {
log.error('ReportProcessor', err);
return;
}
for (let report of reportList) {
reports.updateFields(report.id, { state: reports.ReportState.PROCESSING }, err => {
if (err) {
log.error('ReportProcessor', err);
return;
}
startWorker(report);
});
}
});
}
module.exports.start = (reportId, callback) => {
if (!workers[reportId]) {
log.info('ReportProcessor', 'Scheduling report id: %s', reportId);
reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null}, err => {
if (err) {
return callback(err);
}
if (runningWorkersCount < maxWorkersCount) {
log.info('ReportProcessor', 'Starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
startWorkers();
} else {
log.info('ReportProcessor', 'Not starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
}
callback(null);
});
} else {
log.info('ReportProcessor', 'Worker for report id: %s is already running.', reportId);
}
};
module.exports.stop = (reportId, callback) => {
const tid = workers[reportId];
if (tid) {
log.info('ReportProcessor', 'Killing worker for report id: %s', reportId);
executor.stop(tid);
reports.updateFields(reportId, { state: reports.ReportState.FAILED}, callback);
} else {
log.info('ReportProcessor', 'No running worker found for report id: %s', reportId);
}
};
module.exports.init = callback => {
reports.listWithState(reports.ReportState.PROCESSING, 0, 0, (err, reportList) => {
if (err) {
log.error('ReportProcessor', err);
}
function scheduleReport() {
if (reportList.length > 0) {
const report = reportList.shift();
reports.updateFields(report.id, { state: reports.ReportState.SCHEDULED}, err => {
if (err) {
log.error('ReportProcessor', err);
}
scheduleReport();
});
}
startWorkers();
return callback();
}
scheduleReport();
});
};

View file

@ -1,5 +1,6 @@
'use strict';
const config = require('config');
let fs = require('fs');
let path = require('path');
let db = require('./db');
@ -28,7 +29,6 @@ module.exports = {
prepareHtml,
purifyHTML,
mergeTemplateIntoLayout,
nameToFileName,
workers: new Set()
};
@ -130,11 +130,15 @@ function updateMenu(res) {
title: _('Automation'),
url: '/triggers',
key: 'triggers'
}, {
title: _('Reports'),
url: '/reports',
key: 'reports'
});
if (config.reports && config.reports.enabled === true) {
res.locals.menu.push({
title: _('Reports'),
url: '/reports',
key: 'reports'
});
}
}
function validateEmail(address, checkBlocked, callback) {
@ -302,11 +306,3 @@ function mergeTemplateIntoLayout(template, layout, callback) {
}
}
function nameToFileName(name) {
return name.
trim().
toLowerCase().
replace(/[ .+/]/g, '-').
replace(/[^a-z0-9\-_]/gi, '').
replace(/--*/g, '-');
}

View file

@ -33,10 +33,14 @@
"grunt-eslint": "^19.0.0",
"jsxgettext-andris": "^0.9.0-patch.1"
},
"optionalDependencies": {
"posix": "^4.1.1"
},
"dependencies": {
"async": "^2.3.0",
"aws-sdk": "^2.37.0",
"bcrypt-nodejs": "0.0.3",
"bluebird": "^3.5.0",
"body-parser": "^1.17.1",
"bounce-handler": "^7.3.2-fork.2",
"compression": "^1.6.2",
@ -56,6 +60,7 @@
"faker": "^4.1.0",
"feedparser": "^2.1.0",
"file-type": "^4.1.0",
"fs-extra": "^2.1.2",
"geoip-ultralight": "^0.1.5",
"gettext-parser": "^1.2.2",
"gm": "^1.23.0",
@ -97,6 +102,8 @@
"slugify": "^1.1.0",
"smtp-server": "^2.0.3",
"striptags": "^3.0.1",
"toml": "^2.3.2"
"toml": "^2.3.2",
"try-require": "^1.2.1",
"walk": "^2.3.9"
}
}

View file

@ -75,9 +75,7 @@ router.get('/create', passport.csrfProtection, (req, res) => {
']';
if (!('js' in data)) data.js =
'const reports = require("../lib/models/reports");\n' +
'\n' +
'reports.getCampaignResults(inputs.campaign, ["*"], "", (err, results) => {\n' +
'campaigns.results(inputs.campaign, ["*"], "", (err, results) => {\n' +
' if (err) {\n' +
' return callback(err);\n' +
' }\n' +
@ -136,9 +134,7 @@ router.get('/create', passport.csrfProtection, (req, res) => {
']';
if (!('js' in data)) data.js =
'const reports = require("../lib/models/reports");\n' +
'\n' +
'reports.getCampaignResults(inputs.campaign, ["custom_country", "count(*) AS count_all", "SUM(IF(tracker.count IS NULL, 0, 1)) AS count_opened"], "GROUP BY custom_country", (err, results) => {\n' +
'campaigns.results(inputs.campaign, ["custom_country", "count(*) AS count_all", "SUM(IF(tracker.count IS NULL, 0, 1)) AS count_opened"], "GROUP BY custom_country", (err, results) => {\n' +
' if (err) {\n' +
' return callback(err);\n' +
' }\n' +
@ -213,8 +209,6 @@ router.get('/create', passport.csrfProtection, (req, res) => {
']';
if (!('js' in data)) data.js =
'const subscriptions = require("../lib/models/subscriptions");\n' +
'\n' +
'subscriptions.list(inputs.list.id,0,0, (err, results) => {\n' +
' if (err) {\n' +
' return callback(err);\n' +

View file

@ -6,10 +6,11 @@ const router = new express.Router();
const _ = require('../lib/translate')._;
const reportTemplates = require('../lib/models/report-templates');
const reports = require('../lib/models/reports');
const reportProcessor = require('../services/report-processor');
const reportProcessor = require('../lib/report-processor');
const campaigns = require('../lib/models/campaigns');
const lists = require('../lib/models/lists');
const tools = require('../lib/tools');
const fileHelpers = require('../lib/file-helpers');
const util = require('util');
const htmlescape = require('escape-html');
const striptags = require('striptags');
@ -233,14 +234,13 @@ router.get('/view/:id', (req, res) => {
if (report.state == reports.ReportState.FINISHED) {
if (reportTemplate.mimeType == 'text/html') {
fs.readFile(reportProcessor.getFileName(report, 'report'), (err, reportContent) => {
fs.readFile(fileHelpers.getReportContentFile(report), (err, reportContent) => {
if (err) {
req.flash('danger', err && err.message || err || _('Could not find report with specified ID'));
return res.redirect('/reports');
}
const data = {
csrfToken: req.csrfToken(),
report: new hbs.handlebars.SafeString(reportContent),
title: report.name
};
@ -250,11 +250,11 @@ router.get('/view/:id', (req, res) => {
} else if (reportTemplate.mimeType == 'text/csv') {
const headers = {
'Content-Disposition': 'attachment;filename=' + tools.nameToFileName(report.name) + '.csv',
'Content-Disposition': 'attachment;filename=' + fileHelpers.nameToFileName(report.name) + '.csv',
'Content-Type': 'text/csv'
};
res.sendFile(reportProcessor.getFileName(report, 'report'), {headers: headers});
res.sendFile(fileHelpers.getReportContentFile(report), {headers: headers});
} else {
req.flash('danger', _('Unknown type of template'));
@ -276,9 +276,8 @@ router.get('/output/:id', (req, res) => {
return res.redirect('/reports');
}
fs.readFile(reportProcessor.getFileName(report, 'output'), (err, output) => {
fs.readFile(fileHelpers.getReportOutputFile(report), (err, output) => {
let data = {
csrfToken: req.csrfToken(),
title: 'Output for report ' + report.name
};
@ -298,6 +297,8 @@ function getRowLastRun(row) {
}
function getRowActions(row) {
/* FIXME: add csrf protection to stop and refresh actions */
let requestRefresh = false;
let view, startStop;
let topic = 'data-topic-id="' + row.id + '"';

89
services/executor.js Normal file
View file

@ -0,0 +1,89 @@
'use strict';
/* Privileged executor. If Mailtrain is started as root, this process keeps the root privilege to be able to spawn workers
that can chroot.
*/
const fileHelpers = require('../lib/file-helpers');
const fork = require('child_process').fork;
const path = require('path');
const log = require('npmlog');
const fs = require('fs');
const privilegeHelpers = require('../lib/privilege-helpers');
let processes = {};
function spawnProcess(tid, executable, args, outputFile, cwd) {
fs.open(outputFile, 'w', (err, outFd) => {
if (err) {
log.error('Executor', err);
return;
}
privilegeHelpers.ensureMailtrainOwner(outputFile, (err) => {
if (err) {
log.info('Executor', 'Cannot change owner of output file of process tid:%s.', tid)
}
const options = {
stdio: ['ignore', outFd, outFd, 'ipc'],
cwd: cwd,
env: {NODE_ENV: process.env.NODE_ENV}
};
const child = fork(executable, args, options);
const pid = child.pid;
processes[tid] = child;
log.info('Executor', 'Process started with tid:%s pid:%s.', tid, pid);
process.send({
type: 'process-started',
tid
});
child.on('close', (code, signal) => {
delete processes[tid];
log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s.', tid, pid, code, signal);
fs.close(outFd, (err) => {
if (err) {
log.error('Executor', err);
}
process.send({
type: 'process-finished',
tid,
code,
signal
});
});
});
});
});
}
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'start-report-processor-worker') {
spawnProcess(msg.tid, path.join(__dirname, 'report-processor.js'), [msg.data.id], fileHelpers.getReportOutputFile(msg.data), path.join(__dirname, '..'));
} else if (type === 'stop-process') {
const child = processes[msg.tid];
if (child) {
log.info('Executor', 'Killing process tid:%s pid:%s', msg.tid, child.pid);
child.kill();
} else {
log.info('Executor', 'No running process found with tid:%s pid:%s', msg.tid, child.pid);
}
}
}
});
process.send({
type: 'executor-started'
});

View file

@ -1,133 +0,0 @@
'use strict';
const reports = require('../lib/models/reports');
const reportTemplates = require('../lib/models/report-templates');
const lists = require('../lib/models/lists');
const campaigns = require('../lib/models/campaigns');
const handlebars = require('handlebars');
const handlebarsHelpers = require('../lib/handlebars-helpers');
const _ = require('../lib/translate')._;
const hbs = require('hbs');
const vm = require('vm');
const log = require('npmlog');
const fs = require('fs');
const reportProcessor = require('./report-processor');
handlebarsHelpers.registerHelpers(handlebars);
function resolveEntities(getter, ids, callback) {
const idsRemaining = ids.slice();
const resolved = [];
function doWork() {
if (idsRemaining.length == 0) {
return callback(null, resolved);
}
getter(idsRemaining.shift(), (err, entity) => {
if (err) {
return callback(err);
}
resolved.push(entity);
return doWork();
});
}
setImmediate(doWork);
}
const userFieldTypeToGetter = {
'campaign': (id, callback) => campaigns.get(id, false, callback),
'list': lists.get
};
function resolveUserFields(userFields, params, callback) {
const userFieldsRemaining = userFields.slice();
const resolved = {};
function doWork() {
if (userFieldsRemaining.length == 0) {
return callback(null, resolved);
}
const spec = userFieldsRemaining.shift();
const getter = userFieldTypeToGetter[spec.type];
if (getter) {
return resolveEntities(getter, params[spec.id], (err, entities) => {
if (spec.minOccurences == 1 && spec.maxOccurences == 1) {
resolved[spec.id] = entities[0];
} else {
resolved[spec.id] = entities;
}
doWork();
});
} else {
return callback(new Error(_('Unknown user field type "' + spec.type + '".')));
}
}
setImmediate(doWork);
}
function doneSuccess(id) {
process.exit(0);
}
function doneFail(id) {
process.exit(1);
}
function processReport(reportId) {
reports.get(reportId, (err, report) => {
if (err || !report) {
log.error('reports', err && err.message || err || _('Could not find report with specified ID'));
doneFail();
}
reportTemplates.get(report.reportTemplate, (err, reportTemplate) => {
if (err) {
log.error('reports', err && err.message || err || _('Could not find report template'));
doneFail();
}
resolveUserFields(reportTemplate.userFieldsObject, report.paramsObject, (err, inputs) => {
if (err) {
log.error('reports', err.message || err);
doneFail();
}
const sandbox = {
require: require,
inputs: inputs,
console: console,
callback: (err, outputs) => {
if (err) {
log.error('reports', err.message || err);
doneFail();
}
const hbsTmpl = handlebars.compile(reportTemplate.hbs);
const reportText = hbsTmpl(outputs);
fs.writeFile(reportProcessor.getFileName(report, 'report'), reportText, (err, reportContent) => {
if (err) {
log.error('reports', err && err.message || err || _('Could not find report with specified ID'));
doneFail();
}
doneSuccess();
});
}
};
const script = new vm.Script(reportTemplate.js);
script.runInNewContext(sandbox, {displayErrors: true, timeout: 120000});
});
});
});
}
processReport(Number(process.argv[2]));

View file

@ -1,154 +1,191 @@
'use strict';
const log = require('npmlog');
const db = require('../lib/db');
const reports = require('../lib/models/reports');
const reportTemplates = require('../lib/models/report-templates');
const lists = require('../lib/models/lists');
const subscriptions = require('../lib/models/subscriptions');
const campaigns = require('../lib/models/campaigns');
const handlebars = require('handlebars');
const handlebarsHelpers = require('../lib/handlebars-helpers');
const _ = require('../lib/translate')._;
const path = require('path');
const tools = require('../lib/tools');
const hbs = require('hbs');
const vm = require('vm');
const log = require('npmlog');
const fs = require('fs');
const fork = require('child_process').fork;
const fileHelpers = require('../lib/file-helpers');
const path = require('path');
const privilegeHelpers = require('../lib/privilege-helpers');
let runningWorkersCount = 0;
let maxWorkersCount = 1;
handlebarsHelpers.registerHelpers(handlebars);
let workers = {};
let reportId = Number(process.argv[2]);
let reportDir;
function getFileName(report, suffix) {
return path.join(__dirname, '..', 'protected', 'reports', report.id + '-' + tools.nameToFileName(report.name) + '.' + suffix);
}
function resolveEntities(getter, ids, callback) {
const idsRemaining = ids.slice();
const resolved = [];
module.exports.getFileName = getFileName;
function spawnWorker(report) {
fs.open(getFileName(report, 'output'), 'w', (err, outFd) => {
if (err) {
log.error('ReportProcessor', err);
return;
function doWork() {
if (idsRemaining.length == 0) {
return callback(null, resolved);
}
runningWorkersCount++;
const options = {
stdio: ['ignore', outFd, outFd, 'ipc'],
cwd: path.join(__dirname, '..')
};
let child = fork(path.join(__dirname, 'report-processor-worker.js'), [report.id], options);
let pid = child.pid;
workers[report.id] = child;
log.info('ReportProcessor', 'Worker process for "%s" started with pid %s. Current worker count is %s.', report.name, pid, runningWorkersCount);
child.on('close', (code, signal) => {
runningWorkersCount--;
delete workers[report.id];
log.info('ReportProcessor', 'Worker process for "%s" (pid %s) exited with code %s signal %s. Current worker count is %s.', report.name, pid, code, signal, runningWorkersCount);
fs.close(outFd, (err) => {
if (err) {
log.error('ReportProcessor', err);
}
const fields = {};
if (code ===0 ) {
fields.state = reports.ReportState.FINISHED;
fields.lastRun = new Date();
} else {
fields.state = reports.ReportState.FAILED;
}
reports.updateFields(report.id, fields, (err) => {
if (err) {
log.error('ReportProcessor', err);
}
setImmediate(worker);
});
});
});
});
};
function worker() {
reports.listWithState(reports.ReportState.SCHEDULED, 0, maxWorkersCount - runningWorkersCount, (err, reportList) => {
if (err) {
log.error('ReportProcessor', err);
return;
}
for (let report of reportList) {
reports.updateFields(report.id, { state: reports.ReportState.PROCESSING }, (err) => {
if (err) {
log.error('ReportProcessor', err);
return;
}
spawnWorker(report);
});
}
});
}
module.exports.start = (reportId, callback) => {
if (!workers[reportId]) {
log.info('ReportProcessor', 'Scheduling report id: %s', reportId);
reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null}, (err) => {
getter(idsRemaining.shift(), (err, entity) => {
if (err) {
return callback(err);
}
if (runningWorkersCount < maxWorkersCount) {
log.info('ReportProcessor', 'Starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
worker();
} else {
log.info('ReportProcessor', 'Not starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount);
}
callback(null);
resolved.push(entity);
return doWork();
});
} else {
log.info('ReportProcessor', 'Worker for report id: %s is already running.', reportId);
}
setImmediate(doWork);
}
const userFieldTypeToGetter = {
'campaign': (id, callback) => campaigns.get(id, false, callback),
'list': lists.get
};
module.exports.stop = (reportId, callback) => {
const child = workers[reportId];
if (child) {
log.info('ReportProcessor', 'Killing worker for report id: %s', reportId);
child.kill();
reports.updateFields(reportId, { state: reports.ReportState.FAILED}, callback);
} else {
log.info('ReportProcessor', 'No running worker found for report id: %s', reportId);
}
};
function resolveUserFields(userFields, params, callback) {
const userFieldsRemaining = userFields.slice();
const resolved = {};
module.exports.init = (callback) => {
reports.listWithState(reports.ReportState.PROCESSING, 0, 0, (err, reportList) => {
if (err) {
log.error('ReportProcessor', err);
function doWork() {
if (userFieldsRemaining.length == 0) {
return callback(null, resolved);
}
function scheduleReport() {
if (reportList.length > 0) {
const report = reportList.shift();
const spec = userFieldsRemaining.shift();
const getter = userFieldTypeToGetter[spec.type];
reports.updateFields(report.id, { state: reports.ReportState.SCHEDULED}, (err) => {
if (getter) {
return resolveEntities(getter, params[spec.id], (err, entities) => {
if (spec.minOccurences == 1 && spec.maxOccurences == 1) {
resolved[spec.id] = entities[0];
} else {
resolved[spec.id] = entities;
}
doWork();
});
} else {
return callback(new Error(_('Unknown user field type "' + spec.type + '".')));
}
}
setImmediate(doWork);
}
function tearDownChrootDir(callback) {
if (reportDir) {
privilegeHelpers.tearDownChrootDir(reportDir, callback);
} else {
callback();
}
}
function doneSuccess() {
tearDownChrootDir((err) => {
if (err)
process.exit(1)
else
process.exit(0);
});
}
function doneFail() {
tearDownChrootDir((err) => {
process.exit(1)
});
}
reports.get(reportId, (err, report) => {
if (err || !report) {
log.error('reports', err && err.message || err || _('Could not find report with specified ID'));
doneFail();
return;
}
reportTemplates.get(report.reportTemplate, (err, reportTemplate) => {
if (err) {
log.error('reports', err && err.message || err || _('Could not find report template'));
doneFail();
return;
}
resolveUserFields(reportTemplate.userFieldsObject, report.paramsObject, (err, inputs) => {
if (err) {
log.error('reports', err.message || err);
doneFail();
return;
}
const campaignsProxy = {
results: reports.getCampaignResults,
list: campaigns.list,
get: campaigns.get
};
const subscriptionsProxy = {
list: subscriptions.list
};
const reportFile = fileHelpers.getReportContentFile(report);
const sandbox = {
console,
campaigns: campaignsProxy,
subscriptions: subscriptionsProxy,
inputs,
callback: (err, outputs) => {
if (err) {
log.error('ReportProcessor', err);
log.error('reports', err.message || err);
doneFail();
return;
}
scheduleReport();
});
}
const hbsTmpl = handlebars.compile(reportTemplate.hbs);
const reportText = hbsTmpl(outputs);
worker();
callback();
}
fs.writeFile(path.basename(reportFile), reportText, (err, reportContent) => {
if (err) {
log.error('reports', err && err.message || err || _('Could not find report with specified ID'));
doneFail();
return;
}
scheduleReport();
doneSuccess();
return;
});
}
};
const script = new vm.Script(reportTemplate.js);
reportDir = fileHelpers.getReportDir(report);
privilegeHelpers.setupChrootDir(reportDir, (err) => {
if (err) {
doneFail();
return;
}
privilegeHelpers.chrootAndDropRootPrivileges(reportDir);
try {
script.runInNewContext(sandbox, {displayErrors: true, timeout: 120000});
} catch (err) {
console.log(err);
doneFail();
return;
}
});
});
});
};
});