diff --git a/app.js b/app.js index 91af1fc0..f8bad1bf 100644 --- a/app.js +++ b/app.js @@ -213,8 +213,11 @@ app.use('/api', api); app.use('/editorapi', editorapi); app.use('/grapejs', grapejs); app.use('/mosaico', mosaico); -app.use('/reports', reports); -app.use('/report-templates', reportsTemplates); + +if (config.reports && config.reports.enabled === true) { + app.use('/reports', reports); + app.use('/report-templates', reportsTemplates); +} // catch 404 and forward to error handler app.use((req, res, next) => { diff --git a/config/default.toml b/config/default.toml index 6293d407..54fe11c0 100644 --- a/config/default.toml +++ b/config/default.toml @@ -74,6 +74,11 @@ postsize="2MB" host="localhost" user="mailtrain" password="mailtrain" +# If more security is desired when running reports (which use user-defined JS scripts located in DB), +# one can specify a DB user with read-only permissions. If these are not specified, Mailtrain uses the +# regular DB user (which has also write permissions). +# userRO="mailtrain-ro" +# passwordRO="mailtrain-ro" database="mailtrain" # Some installations, eg. MAMP can use a different port (8889) # MAMP users should also turn on "Allow network access to MySQL" otherwise MySQL might not be accessible @@ -150,3 +155,18 @@ templates=[["versafix-1", "Versafix One"]] [grapejs] # Installed templates templates=[["demo", "Demo Template"]] + +[reports] +# The whole reporting functionality can be disabled below if the they are not needed and the DB cannot be +# properly protected. +# Reports rely on custom user defined Javascript snippets defined in the report template. The snippets are run on the +# server when generating a report. As these snippets are stored in the DB, they pose a security risk because they can +# help gaining access to the server if the DB cannot +# be properly protected (e.g. if it is shared with another application with security weaknesses). +# Mailtrain mitigates this problem by running the custom Javascript snippets in a chrooted environment and under a +# DB user that cannot modify the database (see userRO in [mysql] above). However the chrooted environment is available +# only if Mailtrain is started as root. The chrooted environment still does not prevent the custom JS script in +# performing network operations and in generating XSS attacks as part of the report. +# The bottom line is that if people who are creating report templates or have write access to the DB cannot be trusted, +# then it's safer to switch off the reporting functionality below. +enabled=false diff --git a/index.js b/index.js index 57b9d482..6893ff3e 100644 --- a/index.js +++ b/index.js @@ -4,21 +4,23 @@ * Module dependencies. */ -let config = require('config'); -let log = require('npmlog'); -let app = require('./app'); -let http = require('http'); -let fork = require('child_process').fork; -let triggers = require('./services/triggers'); -let importer = require('./services/importer'); -let verpServer = require('./services/verp-server'); -let testServer = require('./services/test-server'); -let postfixBounceServer = require('./services/postfix-bounce-server'); -let tzupdate = require('./services/tzupdate'); -let feedcheck = require('./services/feedcheck'); -let dbcheck = require('./lib/dbcheck'); -let tools = require('./lib/tools'); -let reportProcessor = require('./services/report-processor'); +const config = require('config'); +const log = require('npmlog'); +const app = require('./app'); +const http = require('http'); +const fork = require('child_process').fork; +const triggers = require('./services/triggers'); +const importer = require('./services/importer'); +const verpServer = require('./services/verp-server'); +const testServer = require('./services/test-server'); +const postfixBounceServer = require('./services/postfix-bounce-server'); +const tzupdate = require('./services/tzupdate'); +const feedcheck = require('./services/feedcheck'); +const dbcheck = require('./lib/dbcheck'); +const tools = require('./lib/tools'); +const reportProcessor = require('./lib/report-processor'); +const executor = require('./lib/executor'); +const privilegeHelpers = require('./lib/privilege-helpers'); let port = config.www.port; let host = config.www.host; @@ -113,32 +115,21 @@ server.on('listening', () => { log.info('Express', 'WWW server listening on %s', bind); // start additional services - testServer(() => { - verpServer(() => { - tzupdate(() => { - importer(() => { - triggers(() => { - spawnSenders(() => { - feedcheck(() => { - postfixBounceServer(() => { - reportProcessor.init(() => { - log.info('Service', 'All services started'); - if (config.group) { - try { - process.setgid(config.group); - log.info('Service', 'Changed group to "%s" (%s)', config.group, process.getgid()); - } catch (E) { - log.info('Service', 'Failed to change group to "%s" (%s)', config.group, E.message); - } - } - if (config.user) { - try { - process.setuid(config.user); - log.info('Service', 'Changed user to "%s" (%s)', config.user, process.getuid()); - } catch (E) { - log.info('Service', 'Failed to change user to "%s" (%s)', config.user, E.message); - } - } + function startNextServices() { + testServer(() => { + verpServer(() => { + + privilegeHelpers.dropRootPrivileges(); + + tzupdate(() => { + importer(() => { + triggers(() => { + spawnSenders(() => { + feedcheck(() => { + postfixBounceServer(() => { + reportProcessor.init(() => { + log.info('Service', 'All services started'); + }); }); }); }); @@ -147,5 +138,11 @@ server.on('listening', () => { }); }); }); - }); + } + + if (config.reports && config.reports.enabled === true) { + executor.spawn(() => startNextServices); + } else { + startNextServices(); + } }); diff --git a/lib/executor.js b/lib/executor.js new file mode 100644 index 00000000..80db9b6b --- /dev/null +++ b/lib/executor.js @@ -0,0 +1,74 @@ +'use strict'; + +const fork = require('child_process').fork; +const log = require('npmlog'); +const path = require('path'); + +const requestCallbacks = {}; +let messageTid = 0; +let executorProcess; + +module.exports = { + spawn, + start, + stop +}; + +function spawn(callback) { + log.info('Executor', 'Spawning executor process.'); + + executorProcess = fork(path.join(__dirname, '..', 'services', 'executor.js'), [], { + cwd: path.join(__dirname, '..'), + env: {NODE_ENV: process.env.NODE_ENV} + }); + + executorProcess.on('message', msg => { + if (msg) { + if (msg.type === 'process-started') { + let requestCallback = requestCallbacks[msg.tid]; + if (requestCallback && requestCallback.startedCallback) { + requestCallback.startedCallback(msg.tid); + } + + } else if (msg.type === 'process-finished') { + let requestCallback = requestCallbacks[msg.tid]; + if (requestCallback && requestCallback.startedCallback) { + requestCallback.finishedCallback(msg.code, msg.signal); + } + + delete requestCallbacks[msg.tid]; + + } else if (msg.type === 'executor-started') { + log.info('Executor', 'Executor process started.'); + return callback(); + } + } + }); + + executorProcess.on('close', (code, signal) => { + log.info('Executor', 'Executor process exited with code %s signal %s.', code, signal); + }); +} + +function start(type, data, startedCallback, finishedCallback) { + requestCallbacks[messageTid] = { + startedCallback, + finishedCallback + }; + + executorProcess.send({ + type: 'start-' + type, + data, + tid: messageTid + }); + + messageTid++; +} + +function stop(tid) { + executorProcess.send({ + type: 'stop-process', + tid + }); +} + diff --git a/lib/file-helpers.js b/lib/file-helpers.js new file mode 100644 index 00000000..aca70e02 --- /dev/null +++ b/lib/file-helpers.js @@ -0,0 +1,33 @@ +'use strict'; + +const path = require('path'); + +function nameToFileName(name) { + return name. + trim(). + toLowerCase(). + replace(/[ .+/]/g, '-'). + replace(/[^a-z0-9\-_]/gi, ''). + replace(/--*/g, '-'); +} + + +function getReportDir(report) { + return path.join(__dirname, '..', 'protected', 'reports', report.id + '-' + nameToFileName(report.name)); +} + +function getReportContentFile(report) { + return path.join(getReportDir(report), 'report'); +} + +function getReportOutputFile(report) { + return getReportDir(report) + '.output'; +} + + +module.exports = { + getReportContentFile, + getReportDir, + getReportOutputFile, + nameToFileName +}; diff --git a/lib/privilege-helpers.js b/lib/privilege-helpers.js new file mode 100644 index 00000000..ce648d70 --- /dev/null +++ b/lib/privilege-helpers.js @@ -0,0 +1,131 @@ +'use strict'; + +const log = require('npmlog'); +const config = require('config'); +const path = require('path'); + +const promise = require('bluebird'); +const fsExtra = promise.promisifyAll(require('fs-extra')); +const fs = promise.promisifyAll(require('fs')); +const walk = require('walk'); + +const tryRequire = require('try-require'); +const posix = tryRequire('posix'); + + +function ensureMailtrainOwner(file, callback) { + try { + const uid = config.user ? posix.getpwnam(config.user).uid : 0; + const gid = config.group ? posix.getgrnam(config.group).gid : 0; + + fs.chown(file, uid, gid, callback); + + } catch (err) { + return callback(err); + } +} + +function ensureMailtrainOwnerRecursive(dir, callback) { + try { + const uid = config.user ? posix.getpwnam(config.user).uid : 0; + const gid = config.group ? posix.getgrnam(config.group).gid : 0; + + fs.chown(dir, uid, gid, err => { + if (err) { + return callback(err); + } + + walk.walk(dir) + .on('node', (root, stat, next) => { + fs.chown(path.join(root, stat.name), uid, gid, next); + }) + .on('end', callback); + }); + } catch (err) { + return callback(err); + } +} + +const ensureMailtrainOwnerRecursiveAsync = promise.promisify(ensureMailtrainOwnerRecursive); + +function dropRootPrivileges() { + if (config.group) { + try { + process.setgid(config.group); + log.info('PrivilegeHelpers', 'Changed group to "%s" (%s)', config.group, process.getgid()); + } catch (E) { + log.info('PrivilegeHelpers', 'Failed to change group to "%s" (%s)', config.group, E.message); + } + } + + if (config.user) { + try { + process.setuid(config.user); + log.info('PrivilegeHelpers', 'Changed user to "%s" (%s)', config.user, process.getuid()); + } catch (E) { + log.info('PrivilegeHelpers', 'Failed to change user to "%s" (%s)', config.user, E.message); + } + } +} + +function setupChrootDir(newRoot, callback) { + try { + fsExtra.emptyDirAsync(newRoot) + .then(() => fsExtra.ensureDirAsync(path.join(newRoot, 'etc'))) + .then(() => fsExtra.copyAsync('/etc/hosts', path.join(newRoot, 'etc', 'hosts'))) + .then(() => ensureMailtrainOwnerRecursiveAsync(newRoot)) + .then(() => { + log.info('PrivilegeHelpers', 'Chroot directory "%s" set up', newRoot); + callback(); + }) + .catch(err => { + log.info('PrivilegeHelpers', 'Failed to setup chroot directory "%s"', newRoot); + callback(err); + }); + + } catch(err) { + log.info('PrivilegeHelpers', 'Failed to setup chroot directory "%s"', newRoot); + } +} + +function tearDownChrootDir(root, callback) { + if (posix) { + fsExtra.removeAsync(path.join('/', 'etc')) + .then(() => { + log.info('PrivilegeHelpers', 'Chroot directory "%s" torn down', root); + callback(); + }) + .catch(err => { + log.info('PrivilegeHelpers', 'Failed to tear down chroot directory "%s"', root); + callback(err); + }); + } +} + +function chrootAndDropRootPrivileges(newRoot) { + + try { + const uid = config.user ? posix.getpwnam(config.user).uid : 0; + const gid = config.group ? posix.getgrnam(config.group).gid : 0; + + posix.chroot(newRoot); + process.chdir('/'); + + process.setgid(gid); + process.setuid(uid); + + log.info('PrivilegeHelpers', 'Changed root to "%s" and privileges to %s.%s', newRoot, uid, gid); + } catch(err) { + log.info('PrivilegeHelpers', 'Failed to change root to "%s" and set privileges', newRoot); + } + +} + +module.exports = { + dropRootPrivileges, + chrootAndDropRootPrivileges, + setupChrootDir, + tearDownChrootDir, + ensureMailtrainOwner, + ensureMailtrainOwnerRecursive +}; diff --git a/lib/report-processor.js b/lib/report-processor.js new file mode 100644 index 00000000..09308d37 --- /dev/null +++ b/lib/report-processor.js @@ -0,0 +1,129 @@ +'use strict'; + +const log = require('npmlog'); +const reports = require('./models/reports'); +const executor = require('./executor'); + +let runningWorkersCount = 0; +let maxWorkersCount = 1; + +let workers = {}; + +function startWorker(report) { + + function onStarted(tid) { + log.info('ReportProcessor', 'Worker process for "%s" started with tid %s. Current worker count is %s.', report.name, tid, runningWorkersCount); + workers[report.id] = tid; + } + + function onFinished(code, signal) { + runningWorkersCount--; + log.info('ReportProcessor', 'Worker process for "%s" (tid %s) exited with code %s signal %s. Current worker count is %s.', report.name, workers[report.id], code, signal, runningWorkersCount); + delete workers[report.id]; + + const fields = {}; + if (code === 0) { + fields.state = reports.ReportState.FINISHED; + fields.lastRun = new Date(); + } else { + fields.state = reports.ReportState.FAILED; + } + + reports.updateFields(report.id, fields, err => { + if (err) { + log.error('ReportProcessor', err); + } + + setImmediate(startWorkers); + }); + } + + const reportData = { + id: report.id, + name: report.name + }; + + runningWorkersCount++; + executor.start('report-processor-worker', reportData, onStarted, onFinished); +} + +function startWorkers() { + reports.listWithState(reports.ReportState.SCHEDULED, 0, maxWorkersCount - runningWorkersCount, (err, reportList) => { + if (err) { + log.error('ReportProcessor', err); + return; + } + + for (let report of reportList) { + reports.updateFields(report.id, { state: reports.ReportState.PROCESSING }, err => { + if (err) { + log.error('ReportProcessor', err); + return; + } + + startWorker(report); + }); + } + }); +} + +module.exports.start = (reportId, callback) => { + if (!workers[reportId]) { + log.info('ReportProcessor', 'Scheduling report id: %s', reportId); + reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null}, err => { + if (err) { + return callback(err); + } + + if (runningWorkersCount < maxWorkersCount) { + log.info('ReportProcessor', 'Starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount); + + startWorkers(); + } else { + log.info('ReportProcessor', 'Not starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount); + } + + callback(null); + }); + } else { + log.info('ReportProcessor', 'Worker for report id: %s is already running.', reportId); + } +}; + +module.exports.stop = (reportId, callback) => { + const tid = workers[reportId]; + if (tid) { + log.info('ReportProcessor', 'Killing worker for report id: %s', reportId); + executor.stop(tid); + reports.updateFields(reportId, { state: reports.ReportState.FAILED}, callback); + } else { + log.info('ReportProcessor', 'No running worker found for report id: %s', reportId); + } +}; + +module.exports.init = callback => { + reports.listWithState(reports.ReportState.PROCESSING, 0, 0, (err, reportList) => { + if (err) { + log.error('ReportProcessor', err); + } + + function scheduleReport() { + if (reportList.length > 0) { + const report = reportList.shift(); + + reports.updateFields(report.id, { state: reports.ReportState.SCHEDULED}, err => { + if (err) { + log.error('ReportProcessor', err); + } + + scheduleReport(); + }); + } + + startWorkers(); + return callback(); + } + + scheduleReport(); + }); +}; diff --git a/lib/tools.js b/lib/tools.js index 7b3074a2..2c508917 100644 --- a/lib/tools.js +++ b/lib/tools.js @@ -1,5 +1,6 @@ 'use strict'; +const config = require('config'); let fs = require('fs'); let path = require('path'); let db = require('./db'); @@ -28,7 +29,6 @@ module.exports = { prepareHtml, purifyHTML, mergeTemplateIntoLayout, - nameToFileName, workers: new Set() }; @@ -130,11 +130,15 @@ function updateMenu(res) { title: _('Automation'), url: '/triggers', key: 'triggers' - }, { - title: _('Reports'), - url: '/reports', - key: 'reports' }); + + if (config.reports && config.reports.enabled === true) { + res.locals.menu.push({ + title: _('Reports'), + url: '/reports', + key: 'reports' + }); + } } function validateEmail(address, checkBlocked, callback) { @@ -302,11 +306,3 @@ function mergeTemplateIntoLayout(template, layout, callback) { } } -function nameToFileName(name) { - return name. - trim(). - toLowerCase(). - replace(/[ .+/]/g, '-'). - replace(/[^a-z0-9\-_]/gi, ''). - replace(/--*/g, '-'); -} diff --git a/package.json b/package.json index 4d95835d..80a97843 100644 --- a/package.json +++ b/package.json @@ -33,10 +33,14 @@ "grunt-eslint": "^19.0.0", "jsxgettext-andris": "^0.9.0-patch.1" }, + "optionalDependencies": { + "posix": "^4.1.1" + }, "dependencies": { "async": "^2.3.0", "aws-sdk": "^2.37.0", "bcrypt-nodejs": "0.0.3", + "bluebird": "^3.5.0", "body-parser": "^1.17.1", "bounce-handler": "^7.3.2-fork.2", "compression": "^1.6.2", @@ -56,6 +60,7 @@ "faker": "^4.1.0", "feedparser": "^2.1.0", "file-type": "^4.1.0", + "fs-extra": "^2.1.2", "geoip-ultralight": "^0.1.5", "gettext-parser": "^1.2.2", "gm": "^1.23.0", @@ -97,6 +102,8 @@ "slugify": "^1.1.0", "smtp-server": "^2.0.3", "striptags": "^3.0.1", - "toml": "^2.3.2" + "toml": "^2.3.2", + "try-require": "^1.2.1", + "walk": "^2.3.9" } } diff --git a/routes/report-templates.js b/routes/report-templates.js index 5393ccb4..f411dd47 100644 --- a/routes/report-templates.js +++ b/routes/report-templates.js @@ -75,9 +75,7 @@ router.get('/create', passport.csrfProtection, (req, res) => { ']'; if (!('js' in data)) data.js = - 'const reports = require("../lib/models/reports");\n' + - '\n' + - 'reports.getCampaignResults(inputs.campaign, ["*"], "", (err, results) => {\n' + + 'campaigns.results(inputs.campaign, ["*"], "", (err, results) => {\n' + ' if (err) {\n' + ' return callback(err);\n' + ' }\n' + @@ -136,9 +134,7 @@ router.get('/create', passport.csrfProtection, (req, res) => { ']'; if (!('js' in data)) data.js = - 'const reports = require("../lib/models/reports");\n' + - '\n' + - 'reports.getCampaignResults(inputs.campaign, ["custom_country", "count(*) AS count_all", "SUM(IF(tracker.count IS NULL, 0, 1)) AS count_opened"], "GROUP BY custom_country", (err, results) => {\n' + + 'campaigns.results(inputs.campaign, ["custom_country", "count(*) AS count_all", "SUM(IF(tracker.count IS NULL, 0, 1)) AS count_opened"], "GROUP BY custom_country", (err, results) => {\n' + ' if (err) {\n' + ' return callback(err);\n' + ' }\n' + @@ -213,8 +209,6 @@ router.get('/create', passport.csrfProtection, (req, res) => { ']'; if (!('js' in data)) data.js = - 'const subscriptions = require("../lib/models/subscriptions");\n' + - '\n' + 'subscriptions.list(inputs.list.id,0,0, (err, results) => {\n' + ' if (err) {\n' + ' return callback(err);\n' + diff --git a/routes/reports.js b/routes/reports.js index ecb2db0a..f1d52e9f 100644 --- a/routes/reports.js +++ b/routes/reports.js @@ -6,10 +6,11 @@ const router = new express.Router(); const _ = require('../lib/translate')._; const reportTemplates = require('../lib/models/report-templates'); const reports = require('../lib/models/reports'); -const reportProcessor = require('../services/report-processor'); +const reportProcessor = require('../lib/report-processor'); const campaigns = require('../lib/models/campaigns'); const lists = require('../lib/models/lists'); const tools = require('../lib/tools'); +const fileHelpers = require('../lib/file-helpers'); const util = require('util'); const htmlescape = require('escape-html'); const striptags = require('striptags'); @@ -233,14 +234,13 @@ router.get('/view/:id', (req, res) => { if (report.state == reports.ReportState.FINISHED) { if (reportTemplate.mimeType == 'text/html') { - fs.readFile(reportProcessor.getFileName(report, 'report'), (err, reportContent) => { + fs.readFile(fileHelpers.getReportContentFile(report), (err, reportContent) => { if (err) { req.flash('danger', err && err.message || err || _('Could not find report with specified ID')); return res.redirect('/reports'); } const data = { - csrfToken: req.csrfToken(), report: new hbs.handlebars.SafeString(reportContent), title: report.name }; @@ -250,11 +250,11 @@ router.get('/view/:id', (req, res) => { } else if (reportTemplate.mimeType == 'text/csv') { const headers = { - 'Content-Disposition': 'attachment;filename=' + tools.nameToFileName(report.name) + '.csv', + 'Content-Disposition': 'attachment;filename=' + fileHelpers.nameToFileName(report.name) + '.csv', 'Content-Type': 'text/csv' }; - res.sendFile(reportProcessor.getFileName(report, 'report'), {headers: headers}); + res.sendFile(fileHelpers.getReportContentFile(report), {headers: headers}); } else { req.flash('danger', _('Unknown type of template')); @@ -276,9 +276,8 @@ router.get('/output/:id', (req, res) => { return res.redirect('/reports'); } - fs.readFile(reportProcessor.getFileName(report, 'output'), (err, output) => { + fs.readFile(fileHelpers.getReportOutputFile(report), (err, output) => { let data = { - csrfToken: req.csrfToken(), title: 'Output for report ' + report.name }; @@ -298,6 +297,8 @@ function getRowLastRun(row) { } function getRowActions(row) { + /* FIXME: add csrf protection to stop and refresh actions */ + let requestRefresh = false; let view, startStop; let topic = 'data-topic-id="' + row.id + '"'; diff --git a/services/executor.js b/services/executor.js new file mode 100644 index 00000000..8873a66b --- /dev/null +++ b/services/executor.js @@ -0,0 +1,89 @@ +'use strict'; + +/* Privileged executor. If Mailtrain is started as root, this process keeps the root privilege to be able to spawn workers + that can chroot. + */ + +const fileHelpers = require('../lib/file-helpers'); +const fork = require('child_process').fork; +const path = require('path'); +const log = require('npmlog'); +const fs = require('fs'); +const privilegeHelpers = require('../lib/privilege-helpers'); + +let processes = {}; + +function spawnProcess(tid, executable, args, outputFile, cwd) { + + fs.open(outputFile, 'w', (err, outFd) => { + if (err) { + log.error('Executor', err); + return; + } + + privilegeHelpers.ensureMailtrainOwner(outputFile, (err) => { + if (err) { + log.info('Executor', 'Cannot change owner of output file of process tid:%s.', tid) + } + + const options = { + stdio: ['ignore', outFd, outFd, 'ipc'], + cwd: cwd, + env: {NODE_ENV: process.env.NODE_ENV} + }; + + const child = fork(executable, args, options); + const pid = child.pid; + processes[tid] = child; + + log.info('Executor', 'Process started with tid:%s pid:%s.', tid, pid); + process.send({ + type: 'process-started', + tid + }); + + child.on('close', (code, signal) => { + + delete processes[tid]; + log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s.', tid, pid, code, signal); + + fs.close(outFd, (err) => { + if (err) { + log.error('Executor', err); + } + + process.send({ + type: 'process-finished', + tid, + code, + signal + }); + }); + }); + }); + }); +} + +process.on('message', msg => { + if (msg) { + const type = msg.type; + + if (type === 'start-report-processor-worker') { + spawnProcess(msg.tid, path.join(__dirname, 'report-processor.js'), [msg.data.id], fileHelpers.getReportOutputFile(msg.data), path.join(__dirname, '..')); + + } else if (type === 'stop-process') { + const child = processes[msg.tid]; + + if (child) { + log.info('Executor', 'Killing process tid:%s pid:%s', msg.tid, child.pid); + child.kill(); + } else { + log.info('Executor', 'No running process found with tid:%s pid:%s', msg.tid, child.pid); + } + } + } +}); + +process.send({ + type: 'executor-started' +}); diff --git a/services/report-processor-worker.js b/services/report-processor-worker.js deleted file mode 100644 index d4141bd6..00000000 --- a/services/report-processor-worker.js +++ /dev/null @@ -1,133 +0,0 @@ -'use strict'; - -const reports = require('../lib/models/reports'); -const reportTemplates = require('../lib/models/report-templates'); -const lists = require('../lib/models/lists'); -const campaigns = require('../lib/models/campaigns'); -const handlebars = require('handlebars'); -const handlebarsHelpers = require('../lib/handlebars-helpers'); -const _ = require('../lib/translate')._; -const hbs = require('hbs'); -const vm = require('vm'); -const log = require('npmlog'); -const fs = require('fs'); -const reportProcessor = require('./report-processor'); - -handlebarsHelpers.registerHelpers(handlebars); - -function resolveEntities(getter, ids, callback) { - const idsRemaining = ids.slice(); - const resolved = []; - - function doWork() { - if (idsRemaining.length == 0) { - return callback(null, resolved); - } - - getter(idsRemaining.shift(), (err, entity) => { - if (err) { - return callback(err); - } - - resolved.push(entity); - return doWork(); - }); - } - - setImmediate(doWork); -} - -const userFieldTypeToGetter = { - 'campaign': (id, callback) => campaigns.get(id, false, callback), - 'list': lists.get -}; - -function resolveUserFields(userFields, params, callback) { - const userFieldsRemaining = userFields.slice(); - const resolved = {}; - - function doWork() { - if (userFieldsRemaining.length == 0) { - return callback(null, resolved); - } - - const spec = userFieldsRemaining.shift(); - const getter = userFieldTypeToGetter[spec.type]; - - if (getter) { - return resolveEntities(getter, params[spec.id], (err, entities) => { - if (spec.minOccurences == 1 && spec.maxOccurences == 1) { - resolved[spec.id] = entities[0]; - } else { - resolved[spec.id] = entities; - } - - doWork(); - }); - } else { - return callback(new Error(_('Unknown user field type "' + spec.type + '".'))); - } - } - - setImmediate(doWork); -} - -function doneSuccess(id) { - process.exit(0); -} - -function doneFail(id) { - process.exit(1); -} - -function processReport(reportId) { - reports.get(reportId, (err, report) => { - if (err || !report) { - log.error('reports', err && err.message || err || _('Could not find report with specified ID')); - doneFail(); - } - - reportTemplates.get(report.reportTemplate, (err, reportTemplate) => { - if (err) { - log.error('reports', err && err.message || err || _('Could not find report template')); - doneFail(); - } - - resolveUserFields(reportTemplate.userFieldsObject, report.paramsObject, (err, inputs) => { - if (err) { - log.error('reports', err.message || err); - doneFail(); - } - - const sandbox = { - require: require, - inputs: inputs, - console: console, - callback: (err, outputs) => { - if (err) { - log.error('reports', err.message || err); - doneFail(); - } - - const hbsTmpl = handlebars.compile(reportTemplate.hbs); - const reportText = hbsTmpl(outputs); - - fs.writeFile(reportProcessor.getFileName(report, 'report'), reportText, (err, reportContent) => { - if (err) { - log.error('reports', err && err.message || err || _('Could not find report with specified ID')); - doneFail(); - } - - doneSuccess(); - }); - } - }; - - const script = new vm.Script(reportTemplate.js); - script.runInNewContext(sandbox, {displayErrors: true, timeout: 120000}); - }); - }); - }); -} - -processReport(Number(process.argv[2])); diff --git a/services/report-processor.js b/services/report-processor.js index be6cadf1..8318008a 100644 --- a/services/report-processor.js +++ b/services/report-processor.js @@ -1,154 +1,191 @@ 'use strict'; -const log = require('npmlog'); -const db = require('../lib/db'); const reports = require('../lib/models/reports'); +const reportTemplates = require('../lib/models/report-templates'); +const lists = require('../lib/models/lists'); +const subscriptions = require('../lib/models/subscriptions'); +const campaigns = require('../lib/models/campaigns'); +const handlebars = require('handlebars'); +const handlebarsHelpers = require('../lib/handlebars-helpers'); const _ = require('../lib/translate')._; -const path = require('path'); -const tools = require('../lib/tools'); +const hbs = require('hbs'); +const vm = require('vm'); +const log = require('npmlog'); const fs = require('fs'); -const fork = require('child_process').fork; +const fileHelpers = require('../lib/file-helpers'); +const path = require('path'); +const privilegeHelpers = require('../lib/privilege-helpers'); -let runningWorkersCount = 0; -let maxWorkersCount = 1; +handlebarsHelpers.registerHelpers(handlebars); -let workers = {}; +let reportId = Number(process.argv[2]); +let reportDir; -function getFileName(report, suffix) { - return path.join(__dirname, '..', 'protected', 'reports', report.id + '-' + tools.nameToFileName(report.name) + '.' + suffix); -} +function resolveEntities(getter, ids, callback) { + const idsRemaining = ids.slice(); + const resolved = []; -module.exports.getFileName = getFileName; - -function spawnWorker(report) { - - fs.open(getFileName(report, 'output'), 'w', (err, outFd) => { - if (err) { - log.error('ReportProcessor', err); - return; + function doWork() { + if (idsRemaining.length == 0) { + return callback(null, resolved); } - runningWorkersCount++; - - const options = { - stdio: ['ignore', outFd, outFd, 'ipc'], - cwd: path.join(__dirname, '..') - }; - - let child = fork(path.join(__dirname, 'report-processor-worker.js'), [report.id], options); - let pid = child.pid; - workers[report.id] = child; - - log.info('ReportProcessor', 'Worker process for "%s" started with pid %s. Current worker count is %s.', report.name, pid, runningWorkersCount); - - child.on('close', (code, signal) => { - runningWorkersCount--; - - delete workers[report.id]; - log.info('ReportProcessor', 'Worker process for "%s" (pid %s) exited with code %s signal %s. Current worker count is %s.', report.name, pid, code, signal, runningWorkersCount); - - fs.close(outFd, (err) => { - if (err) { - log.error('ReportProcessor', err); - } - - const fields = {}; - if (code ===0 ) { - fields.state = reports.ReportState.FINISHED; - fields.lastRun = new Date(); - } else { - fields.state = reports.ReportState.FAILED; - } - - reports.updateFields(report.id, fields, (err) => { - if (err) { - log.error('ReportProcessor', err); - } - - setImmediate(worker); - }); - }); - }); - }); -}; - -function worker() { - reports.listWithState(reports.ReportState.SCHEDULED, 0, maxWorkersCount - runningWorkersCount, (err, reportList) => { - if (err) { - log.error('ReportProcessor', err); - return; - } - - for (let report of reportList) { - reports.updateFields(report.id, { state: reports.ReportState.PROCESSING }, (err) => { - if (err) { - log.error('ReportProcessor', err); - return; - } - - spawnWorker(report); - }); - } - }); -} - -module.exports.start = (reportId, callback) => { - if (!workers[reportId]) { - log.info('ReportProcessor', 'Scheduling report id: %s', reportId); - reports.updateFields(reportId, { state: reports.ReportState.SCHEDULED, lastRun: null}, (err) => { + getter(idsRemaining.shift(), (err, entity) => { if (err) { return callback(err); } - if (runningWorkersCount < maxWorkersCount) { - log.info('ReportProcessor', 'Starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount); - - worker(); - } else { - log.info('ReportProcessor', 'Not starting worker because runningWorkersCount=%s maxWorkersCount=%s', runningWorkersCount, maxWorkersCount); - } - - callback(null); + resolved.push(entity); + return doWork(); }); - } else { - log.info('ReportProcessor', 'Worker for report id: %s is already running.', reportId); } + + setImmediate(doWork); +} + +const userFieldTypeToGetter = { + 'campaign': (id, callback) => campaigns.get(id, false, callback), + 'list': lists.get }; -module.exports.stop = (reportId, callback) => { - const child = workers[reportId]; - if (child) { - log.info('ReportProcessor', 'Killing worker for report id: %s', reportId); - child.kill(); - reports.updateFields(reportId, { state: reports.ReportState.FAILED}, callback); - } else { - log.info('ReportProcessor', 'No running worker found for report id: %s', reportId); - } -}; +function resolveUserFields(userFields, params, callback) { + const userFieldsRemaining = userFields.slice(); + const resolved = {}; -module.exports.init = (callback) => { - reports.listWithState(reports.ReportState.PROCESSING, 0, 0, (err, reportList) => { - if (err) { - log.error('ReportProcessor', err); + function doWork() { + if (userFieldsRemaining.length == 0) { + return callback(null, resolved); } - function scheduleReport() { - if (reportList.length > 0) { - const report = reportList.shift(); + const spec = userFieldsRemaining.shift(); + const getter = userFieldTypeToGetter[spec.type]; - reports.updateFields(report.id, { state: reports.ReportState.SCHEDULED}, (err) => { + if (getter) { + return resolveEntities(getter, params[spec.id], (err, entities) => { + if (spec.minOccurences == 1 && spec.maxOccurences == 1) { + resolved[spec.id] = entities[0]; + } else { + resolved[spec.id] = entities; + } + + doWork(); + }); + } else { + return callback(new Error(_('Unknown user field type "' + spec.type + '".'))); + } + } + + setImmediate(doWork); +} + +function tearDownChrootDir(callback) { + if (reportDir) { + privilegeHelpers.tearDownChrootDir(reportDir, callback); + } else { + callback(); + } +} + +function doneSuccess() { + tearDownChrootDir((err) => { + if (err) + process.exit(1) + else + process.exit(0); + }); +} + +function doneFail() { + tearDownChrootDir((err) => { + process.exit(1) + }); +} + + + +reports.get(reportId, (err, report) => { + if (err || !report) { + log.error('reports', err && err.message || err || _('Could not find report with specified ID')); + doneFail(); + return; + } + + reportTemplates.get(report.reportTemplate, (err, reportTemplate) => { + if (err) { + log.error('reports', err && err.message || err || _('Could not find report template')); + doneFail(); + return; + } + + resolveUserFields(reportTemplate.userFieldsObject, report.paramsObject, (err, inputs) => { + if (err) { + log.error('reports', err.message || err); + doneFail(); + return; + } + + const campaignsProxy = { + results: reports.getCampaignResults, + list: campaigns.list, + get: campaigns.get + }; + + const subscriptionsProxy = { + list: subscriptions.list + }; + + const reportFile = fileHelpers.getReportContentFile(report); + + const sandbox = { + console, + campaigns: campaignsProxy, + subscriptions: subscriptionsProxy, + inputs, + + callback: (err, outputs) => { if (err) { - log.error('ReportProcessor', err); + log.error('reports', err.message || err); + doneFail(); + return; } - scheduleReport(); - }); - } + const hbsTmpl = handlebars.compile(reportTemplate.hbs); + const reportText = hbsTmpl(outputs); - worker(); - callback(); - } + fs.writeFile(path.basename(reportFile), reportText, (err, reportContent) => { + if (err) { + log.error('reports', err && err.message || err || _('Could not find report with specified ID')); + doneFail(); + return; + } - scheduleReport(); + doneSuccess(); + return; + }); + } + }; + + const script = new vm.Script(reportTemplate.js); + + reportDir = fileHelpers.getReportDir(report); + privilegeHelpers.setupChrootDir(reportDir, (err) => { + if (err) { + doneFail(); + return; + } + + privilegeHelpers.chrootAndDropRootPrivileges(reportDir); + + try { + script.runInNewContext(sandbox, {displayErrors: true, timeout: 120000}); + } catch (err) { + console.log(err); + + doneFail(); + return; + } + }); + }); }); -}; +}); +