New project structure

Beta of extract.js for extracting english locale
This commit is contained in:
Tomas Bures 2018-11-18 15:38:52 +01:00
parent e18d2b2f84
commit 2edbd67205
247 changed files with 6405 additions and 4237 deletions

132
server/services/executor.js Normal file
View file

@ -0,0 +1,132 @@
'use strict';
/* Privileged executor. If Mailtrain is started as root, this process keeps the root privilege to be able to spawn workers
that can chroot.
*/
const reportHelpers = require('../lib/report-helpers');
const fork = require('child_process').fork;
const path = require('path');
const log = require('../lib/log');
const fs = require('fs');
const privilegeHelpers = require('../lib/privilege-helpers');
let processes = {};
function spawnProcess(tid, executable, args, outFile, errFile, cwd, uid, gid) {
function reportFail(msg) {
process.send({
type: 'process-failed',
msg,
tid
});
}
fs.open(outFile, 'w', (err, outFd) => {
if (err) {
log.error('Executor', err);
reportFail('Cannot create standard output file');
return;
}
fs.open(errFile, 'w', (err, errFd) => {
if (err) {
log.error('Executor', err);
reportFail('Cannot create standard error file');
return;
}
privilegeHelpers.ensureMailtrainOwner(outFile, err => {
if (err) {
log.warn('Executor', 'Cannot change owner of output file of process tid:%s', tid);
}
privilegeHelpers.ensureMailtrainOwner(errFile, err => {
if (err) {
log.warn('Executor', 'Cannot change owner of error output file of process tid:%s', tid);
}
const options = {
stdio: ['ignore', outFd, errFd, 'ipc'],
cwd,
env: {NODE_ENV: process.env.NODE_ENV},
uid,
gid
};
let child;
try {
child = fork(executable, args, options);
} catch (err) {
log.error('Executor', 'Cannot start process with tid:%s', tid);
reportFail('Cannot start process');
return;
}
const pid = child.pid;
processes[tid] = child;
log.info('Executor', 'Process started with tid:%s pid:%s', tid, pid);
process.send({
type: 'process-started',
tid
});
child.on('close', (code, signal) => {
delete processes[tid];
log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s', tid, pid, code, signal);
fs.close(outFd, err => {
if (err) {
log.error('Executor', err);
}
fs.close(errFd, err => {
if (err) {
log.error('Executor', err);
}
process.send({
type: 'process-finished',
tid,
code,
signal
});
});
});
});
});
});
});
});
}
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'start-report-processor-worker') {
const ids = privilegeHelpers.getConfigROUidGid();
spawnProcess(msg.tid, path.join(__dirname, '..', 'workers', 'reports', 'report-processor.js'), [msg.data.id], reportHelpers.getReportContentFile(msg.data), reportHelpers.getReportOutputFile(msg.data), path.join(__dirname, '..', 'workers', 'reports'), ids.uid, ids.gid);
} else if (type === 'stop-process') {
const child = processes[msg.tid];
if (child) {
log.info('Executor', 'Killing process tid:%s pid:%s', msg.tid, child.pid);
child.kill();
} else {
log.info('Executor', 'No running process found with tid:%s pid:%s', msg.tid, child.pid);
}
}
}
});
process.send({
type: 'executor-started'
});

View file

@ -0,0 +1,165 @@
'use strict';
const log = require('../lib/log');
const knex = require('../lib/knex');
const feedparser = require('feedparser-promised');
const { CampaignType, CampaignStatus, CampaignSource } = require('../../shared/campaigns');
const util = require('util');
const campaigns = require('../models/campaigns');
const contextHelpers = require('../lib/context-helpers');
const { tLog } = require('../lib/translate');
const feedCheckInterval = 10 * 60 * 1000;
const dbCheckInterval = 60 * 1000;
let running = false;
async function fetch(url) {
const httpOptions = {
uri: 'http://feeds.feedwrench.com/JavaScriptJabber.rss',
headers: {
'user-agent': 'Mailtrain',
'accept': 'text/html,application/xhtml+xml'
}
};
const items = await feedparser.parse(httpOptions);
const entries = [];
for (const item of items) {
const entry = {
title: item.title,
date: item.date || item.pubdate || item.pubDate || new Date(),
guid: item.guid || item.link,
link: item.link,
content: item.description || item.summary,
summary: item.summary || item.description,
image_url: item.image.url
};
entries.push(entry);
}
return entries;
}
async function run() {
if (running) {
return;
}
running = true;
let rssCampaignIdRow;
while (rssCampaignIdRow = await knex('campaigns')
.where('type', CampaignType.RSS)
.where('status', CampaignStatus.ACTIVE)
.where(qry => qry.whereNull('last_check').orWhere('last_check', '<', new Date(Date.now() - feedCheckInterval)))
.select('id')
.first()) {
const rssCampaign = await campaigns.getById(contextHelpers.getAdminContext(), rssCampaignIdRow.id, false);
let checkStatus = null;
try {
const entries = await fetch(rssCampaign.data.feedUrl);
let added = 0;
for (const entry of entries) {
let entryId = null;
await knex.transaction(async tx => {
const existingEntry = await tx('rss').where({
parent: rssCampaign.id,
guid: entry.guid
}).first();
if (!existingEntry) {
const campaignData = {};
let source = rssCampaign.source;
if (source === CampaignSource.CUSTOM_FROM_TEMPLATE || source === CampaignSource.CUSTOM) {
source = CampaignSource.CUSTOM_FROM_CAMPAIGN;
campaignData.sourceCampaign = rssCampaign.id;
} else {
Object.assign(campaignData, rssCampaign.data);
}
campaignData.rssEntry = entry;
const campaign = {
parent: rssCampaign.id,
type: CampaignType.RSS_ENTRY,
source,
name: entry.title || `RSS entry ${entry.guid.substr(0, 67)}`,
lists: rssCampaign.lists,
namespace: rssCampaign.namespace,
send_configuration: rssCampaign.send_configuration,
from_name_override: rssCampaign.from_name_override,
from_email_override: rssCampaign.from_email_override,
reply_to_override: rssCampaign.reply_to_override,
subject_override: rssCampaign.subject_override,
data: campaignData,
click_tracking_disabled: rssCampaign.click_tracking_disabled,
open_tracking_disabled: rssCampaign.open_tracking_disabled,
unsubscribe_url: rssCampaign.unsubscribe_url
};
const ids = await campaigns.createRssTx(tx, contextHelpers.getAdminContext(), campaign);
const campaignId = ids[0];
await tx('rss').insert({
parent: rssCampaign.id,
campaign: campaignId,
guid: entry.guid,
pubdate: entry.date,
});
added += 1;
}
});
}
if (added > 0) {
checkStatus = tLog('feedCheck.campaignsAdded', {addedMessages: added, campaignId: rssCampaign.id});
log.verbose('Feed', `Found ${added} new campaigns messages from feed ${rssCampaign.id}`);
process.send({
type: 'entries-added'
});
} else {
checkStatus = tLog('feedCheck.nothingNew');
}
rssCampaign.data.checkStatus = checkStatus;
await knex('campaigns').where('id', rssCampaign.id).update({
last_check: Date.now(),
data: JSON.stringify(rssCampaign.data)
});
} catch (err) {
log.error('Feed', err.message);
rssCampaign.data.checkStatus = err.message;
await knex('campaigns').where('id', rssCampaign.id).update({
last_check: Date.now(),
data: JSON.stringify(rssCampaign.data)
});
}
}
running = false;
setTimeout(run, dbCheckInterval);
}
process.send({
type: 'feedcheck-started'
});
run();

410
server/services/importer.js Normal file
View file

@ -0,0 +1,410 @@
'use strict';
const knex = require('../lib/knex');
const path = require('path');
const log = require('../lib/log');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../../shared/imports');
const imports = require('../models/imports');
const fields = require('../models/fields');
const subscriptions = require('../models/subscriptions');
const { Writable } = require('stream');
const { cleanupFromPost, enforce } = require('../lib/helpers');
const contextHelpers = require('../lib/context-helpers');
const tools = require('../lib/tools');
const shares = require('../models/shares');
const { tLog } = require('../lib/translate');
const csvparse = require('csv-parse');
const fs = require('fs');
let running = false;
const maxPrepareBatchSize = 100;
const maxImportBatchSize = 10;
function prepareCsv(impt) {
// Processing of CSV intake
const filePath = path.join(imports.filesDir, impt.settings.csv.filename);
const importTable = 'import_file__' + impt.id;
let finishedWithError = false;
let firstRow;
const finishWithError = async (msg, err) => {
finishedWithError = true;
log.error('Importer (CSV)', err.stack);
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FAILED,
error: msg + '\n' + err.message
});
await fsExtra.removeAsync(filePath);
};
const finishWithSuccess = async () => {
if (finishedWithError) {
return;
}
log.info('Importer (CSV)', 'Preparation finished');
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FINISHED,
error: null
});
await fsExtra.removeAsync(filePath);
};
const processRows = async (chunks) => {
let insertBatch = [];
for (const chunkEntry of chunks) {
const record = chunkEntry.chunk;
if (!firstRow) {
firstRow = true;
const cols = [];
let colsDef = '';
for (let idx = 0; idx < record.length; idx++) {
const colName = 'column_' + idx;
cols.push({
column: colName,
name: record[idx]
});
colsDef += ' `' + colName + '` text DEFAULT NULL,\n';
}
impt.settings.csv.columns = cols;
impt.settings.sourceTable = importTable;
await knex('imports').where({id: impt.id}).update({settings: JSON.stringify(impt.settings)});
await knex.schema.raw('CREATE TABLE `' + importTable + '` (\n' +
' `id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n' +
colsDef +
' PRIMARY KEY (`id`)\n' +
') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;\n');
} else {
const dbRecord = {};
for (let idx = 0; idx < record.length; idx++) {
dbRecord['column_' + idx] = record[idx];
}
insertBatch.push(dbRecord);
}
if (insertBatch.length >= maxPrepareBatchSize) {
await knex(importTable).insert(insertBatch);
insertBatch = [];
}
}
if (insertBatch.length > 0) {
await knex(importTable).insert(insertBatch);
}
};
const inputStream = fs.createReadStream(filePath);
const parser = csvparse({
comment: '#',
delimiter: impt.settings.csv.delimiter
});
inputStream.on('error', err => finishWithError('Error reading CSV file.', err));
parser.on('error', err => finishWithError('Error parsing CSV file.', err));
const importProcessor = new Writable({
write(chunk, encoding, callback) {
processRows([{chunk, encoding}]).then(() => callback());
},
writev(chunks, callback) {
processRows(chunks).then(() => callback());
},
final(callback) {
finishWithSuccess().then(() => callback());
},
objectMode: true
});
parser.pipe(importProcessor);
inputStream.pipe(parser);
}
async function _execImportRun(impt, handlers) {
try {
let imptRun;
// It should not really happen that we have more than one run to be processed for an import. However, to be on the safe side, we process it in a while.
while (imptRun = await knex('import_runs').where('import', impt.id).whereIn('status', [RunStatus.SCHEDULED]).orderBy('created', 'asc').first()) {
try {
imptRun.mapping = JSON.parse(imptRun.mapping) || {};
log.info('Importer', `Starting BASIC_SUBSCRIBE run ${impt.id}.${imptRun.id}`);
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.RUNNING
});
const importTable = impt.settings.sourceTable;
const flds = await fields.list(contextHelpers.getAdminContext(), impt.list);
let lastId = imptRun.last_id || 0;
let countNew = imptRun.new || 0;
let countProcessed = imptRun.processed || 0;
let countFailed = imptRun.failed || 0;
while (true) {
const rows = await knex(importTable).orderBy('id', 'asc').where('id', '>', lastId).limit(maxImportBatchSize);
log.verbose('Importer', `Processing run ${impt.id}.${imptRun.id} with id > ${lastId} ... ${rows.length} entries`);
if (rows.length === 0) {
break;
}
const subscrs = [];
const unsubscrs = [];
const failures = [];
// This should help in case we do the DNS check inside process row because it does all the checks at the same time.
await Promise.all(rows.map(row => handlers.processSourceRow(impt, imptRun, flds, row, subscrs, unsubscrs, failures)));
lastId = rows[rows.length - 1].id;
await knex.transaction(async tx => {
const groupedFieldsMap = await subscriptions.getGroupedFieldsMapTx(tx, impt.list);
let newRows = 0;
for (const subscr of subscrs) {
const meta = {
updateAllowed: true,
updateOfUnsubscribedAllowed: true,
subscribeIfNoExisting: true
};
try {
await subscriptions.createTxWithGroupedFieldsMap(tx, contextHelpers.getAdminContext(), impt.list, groupedFieldsMap, subscr, impt.id, meta);
if (!meta.existing) {
newRows += 1;
}
} catch (err) {
failures.push({
run: imptRun.id,
source_id: subscr.source_id,
email: subscr.email,
reason: err.message
});
}
}
for (const unsubscr of unsubscrs) {
try {
await subscriptions.unsubscribeByEmailAndGetTx(tx, contextHelpers.getAdminContext(), impt.list, unsubscr.email);
} catch (err) {
failures.push({
run: imptRun.id,
source_id: unsubscr.source_id,
email: unsubscr.email,
reason: err.message
});
}
}
countProcessed += rows.length;
countNew += newRows;
countFailed += failures.length;
if (failures.length > 0) {
await tx('import_failed').insert(failures);
}
await tx('import_runs').where('id', imptRun.id).update({
last_id: lastId,
new: countNew,
failed: countFailed,
processed: countProcessed
});
});
const imptRunStatus = await knex('import_runs').where('id', imptRun.id).select(['status']).first();
if (imptRunStatus.status === RunStatus.STOPPING) {
throw new Error('Aborted');
}
}
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.FINISHED,
error: null,
finished: new Date()
});
log.info('Importer', `BASIC_SUBSCRIBE run ${impt.id}.${imptRun.id} finished`);
} catch (err) {
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.FAILED,
error: err.message,
finished: new Date()
});
throw new Error('Last run failed');
}
}
await knex('imports').where('id', impt.id).update({
last_run: new Date(),
error: null,
status: ImportStatus.RUN_FINISHED
});
} catch (err) {
await knex('imports').where('id', impt.id).update({
last_run: new Date(),
error: err.message,
status: ImportStatus.RUN_FAILED
});
}
}
async function basicSubscribe(impt) {
const handlers = {
processSourceRow: async (impt, imptRun, flds, row, subscriptions, unsubscriptions, failures) => {
const mappingFields = imptRun.mapping.fields || {};
const mappingSettings = imptRun.mapping.settings || {};
const convRow = {};
for (const col in mappingFields) {
const fldMapping = mappingFields[col];
if (fldMapping && fldMapping.column) {
convRow[col] = row[fldMapping.column];
}
}
const subscription = fields.fromImport(impt.list, flds, convRow);
const email = cleanupFromPost(convRow.email);
let errorMsg;
if (!email) {
errorMsg = tLog('importer.missingEmail');
}
if (mappingSettings.checkEmails) {
const emailErr = await tools.validateEmail(email);
if (emailErr) {
errorMsg = tools.validateEmailGetMessage(emailErr, email);
}
}
if (!errorMsg) {
subscription.email = email;
subscription.source_id = row.id;
subscriptions.push(subscription);
} else {
failures.push({
run: imptRun.id,
source_id: row.id,
email: email,
reason: errorMsg
});
}
}
};
return await _execImportRun(impt, handlers);
}
async function basicUnsubscribe(impt) {
const handlers = {
processSourceRow: async (impt, imptRun, flds, row, subscriptions, unsubscriptions, failures) => {
const emailCol = imptRun.mapping.fields.email.column;
const email = cleanupFromPost(row[emailCol]);
let errorMsg;
if (!email) {
errorMsg = tLog('importer.missingEmail');
}
if (!errorMsg) {
unsubscriptions.push({
source_id: row.id,
email
});
} else {
failures.push({
run: imptRun.id,
source_id: row.id,
email: email,
reason: errorMsg
});
}
}
};
return await _execImportRun(impt, handlers);
}
async function getTask() {
return await knex.transaction(async tx => {
const impt = await tx('imports').whereIn('status', [ImportStatus.PREP_SCHEDULED, ImportStatus.RUN_SCHEDULED]).orderBy('created', 'asc').first();
if (impt) {
impt.settings = JSON.parse(impt.settings) || {};
if (impt.source === ImportSource.CSV_FILE && impt.status === ImportStatus.PREP_SCHEDULED) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.PREP_RUNNING);
return () => prepareCsv(impt);
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.mapping_type === MappingType.BASIC_SUBSCRIBE) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.RUN_RUNNING);
return () => basicSubscribe(impt);
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.mapping_type === MappingType.BASIC_UNSUBSCRIBE) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.RUN_RUNNING);
return () => basicUnsubscribe(impt);
}
} else {
return null;
}
});
}
async function run() {
if (running) {
return;
}
running = true;
let task;
while ((task = await getTask()) != null) {
task();
}
running = false;
}
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'scheduleCheck') {
run();
}
}
});
process.send({
type: 'importer-started'
});
run();

View file

@ -0,0 +1,125 @@
'use strict';
const log = require('../lib/log');
const config = require('config');
const net = require('net');
const campaigns = require('../models/campaigns');
const contextHelpers = require('../lib/context-helpers');
const { SubscriptionStatus } = require('../../shared/lists');
const seenIds = new Set();
let remainder = '';
let reading = false;
async function readNextChunks() {
if (reading) {
return false;
}
reading = true;
while (true) {
const chunk = socket.read();
if (chunk === null) {
reading = false;
return;
}
const lines = (remainder + chunk.toString()).split(/\r?\n/);
remainder = lines.pop();
for (const line of lines) {
try {
const match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/);
if (match) {
let queueId = match[1];
let queued = '';
let queuedAs = '';
if (!seenIds.has(queueId)) {
seenIds.add(queueId);
// Losacno: Check for local requeue
let status = match[2];
log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status);
if (status === 'sent') {
// Save new queueId to update message's previous queueId (thanks @mfechner )
queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/);
if (queued) {
queued = queued[1];
queuedAs = queued.match(/ queued as (\w+)/);
if (queuedAs) {
queuedAs = queuedAs[1];
} else {
queuedAs = '';
}
}
}
const message = await campaigns.getMessageByResponseId(queueId);
if (message) {
if (queuedAs || status === 'sent') {
log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queuedAs);
// Update message's previous queueId (thanks @mfechner )
campaigns.updateMessageResponse(contextHelpers.getAdminContext(), message, queued, queuedAs);
log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queuedAs);
} else {
campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true);
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
}
// No need to keep in memory... free it ( thanks @witzig )
seenIds.delete(queueId);
}
}
}
} catch (err) {
log.error('POSTFIXBOUNCE', err && err.stack);
}
}
}
}
module.exports = callback => {
if (!config.postfixbounce.enabled) {
return setImmediate(callback);
}
let started = false; // Not sure why all this magic around "started". But it was there this way in Mailtrain v1, so we kept it.
const server = net.createServer(socket => {
socket.on('readable', readNextChunks);
});
server.on('error', err => {
const port = config.postfixbounce.port;
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
switch (err.code) {
case 'EACCES':
log.error('POSTFIXBOUNCE', '%s requires elevated privileges.', bind);
break;
case 'EADDRINUSE':
log.error('POSTFIXBOUNCE', '%s is already in use', bind);
break;
default:
log.error('POSTFIXBOUNCE', err);
}
if (!started) {
started = true;
return callback(err);
}
});
server.listen(config.postfixbounce.port, config.postfixbounce.host, () => {
if (started) {
return server.close();
}
started = true;
log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port);
setImmediate(callback);
});
};

View file

@ -0,0 +1,290 @@
'use strict';
const config = require('config');
const fork = require('child_process').fork;
const log = require('../lib/log');
const path = require('path');
const knex = require('../lib/knex');
const {CampaignStatus, CampaignType} = require('../../shared/campaigns');
const { enforce } = require('../lib/helpers');
const campaigns = require('../models/campaigns');
let messageTid = 0;
const workerProcesses = new Map();
const idleWorkers = [];
let campaignSchedulerRunning = false;
let workerSchedulerRunning = false;
const campaignsCheckPeriod = 5 * 1000;
const retrieveBatchSize = 1000;
const workerBatchSize = 100;
const messageQueue = new Map(); // campaignId -> [{listId, email}]
const messageQueueCont = new Map(); // campaignId -> next batch callback
const workAssignment = new Map(); // workerId -> { campaignId, subscribers: [{listId, email}] }
let workerSchedulerCont = null;
function messagesProcessed(workerId) {
workAssignment.delete(workerId);
idleWorkers.push(workerId);
if (workerSchedulerCont) {
const cont = workerSchedulerCont;
setImmediate(workerSchedulerCont);
workerSchedulerCont = null;
}
}
async function scheduleWorkers() {
async function getAvailableWorker() {
if (idleWorkers.length > 0) {
return idleWorkers.shift();
} else {
const workerAvailable = new Promise(resolve => {
workerSchedulerCont = resolve;
});
await workerAvailable;
return idleWorkers.shift();
}
}
if (workerSchedulerRunning) {
return;
}
workerSchedulerRunning = true;
let workerId = await getAvailableWorker();
let keepLooping = true;
while (keepLooping) {
keepLooping = false;
for (const campaignId of messageQueue.keys()) {
const queue = messageQueue.get(campaignId);
if (queue.length > 0) {
const subscribers = queue.splice(0, workerBatchSize);
workAssignment.set(workerId, {campaignId, subscribers});
if (queue.length === 0 && messageQueueCont.has(campaignId)) {
const scheduleMessages = messageQueueCont.get(campaignId);
setImmediate(scheduleMessages);
}
sendToWorker(workerId, 'process-messages', {
campaignId,
subscribers
});
workerId = await getAvailableWorker();
keepLooping = true;
}
}
}
idleWorkers.push(workerId);
workerSchedulerRunning = false;
}
async function processCampaign(campaignId) {
async function finish() {
await knex('campaigns').where('id', campaignId).update({status: CampaignStatus.FINISHED});
messageQueue.delete(campaignId);
}
const msgQueue = [];
messageQueue.set(campaignId, msgQueue);
try {
while (true) {
const cpg = await knex('campaigns').where('id', campaignId).first();
if (cpg.status === CampaignStatus.PAUSED) {
messageQueue.delete(campaignId);
return;
}
let qryGen;
await knex.transaction(async tx => {
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true);
});
if (qryGen) {
let subscribersInProcessing = [...msgQueue];
for (const wa of workAssignment.values()) {
if (wa.campaignId === campaignId) {
subscribersInProcessing = subscribersInProcessing.concat(wa.subscribers);
}
}
const qry = qryGen(knex)
.whereNotIn('pending_subscriptions.email', subscribersInProcessing.map(x => x.email))
.select(['pending_subscriptions.email', 'campaign_lists.list'])
.limit(retrieveBatchSize);
const subs = await qry;
if (subs.length === 0) {
await finish();
return;
}
for (const sub of subs) {
msgQueue.push({
listId: sub.list,
email: sub.email
});
}
const nextBatchNeeded = new Promise(resolve => {
messageQueueCont.set(campaignId, resolve);
});
// noinspection JSIgnoredPromiseFromCall
setImmediate(scheduleWorkers);
await nextBatchNeeded;
} else {
await finish();
return;
}
}
} catch (err) {
log.error('Senders', `Sending campaign ${campaignId} failed with error: ${err.message}`)
log.verbose(err);
}
}
async function scheduleCampaigns() {
if (campaignSchedulerRunning) {
return;
}
campaignSchedulerRunning = true;
while (true) {
let campaignId = 0;
await knex.transaction(async tx => {
const scheduledCampaign = await tx('campaigns')
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.where('campaigns.status', CampaignStatus.SCHEDULED)
.where(qry => qry.whereNull('campaigns.scheduled').orWhere('campaigns.scheduled', '<=', new Date()))
.select(['id'])
.first();
if (scheduledCampaign) {
await tx('campaigns').where('id', scheduledCampaign.id).update({status: CampaignStatus.SENDING});
campaignId = scheduledCampaign.id;
}
});
if (campaignId) {
// noinspection JSIgnoredPromiseFromCall
processCampaign(campaignId);
} else {
break;
}
}
campaignSchedulerRunning = false;
}
async function spawnWorker(workerId) {
return await new Promise((resolve, reject) => {
log.verbose('Senders', `Spawning worker process ${workerId}`);
const senderProcess = fork(path.join(__dirname, 'sender-worker.js'), [workerId], {
cwd: path.join(__dirname, '..'),
env: {NODE_ENV: process.env.NODE_ENV}
});
senderProcess.on('message', msg => {
if (msg) {
if (msg.type === 'worker-started') {
log.info('Senders', `Worker process ${workerId} started`);
return resolve();
} else if (msg.type === 'messages-processed') {
messagesProcessed(workerId);
}
}
});
senderProcess.on('close', (code, signal) => {
log.error('Senders', `Worker process ${workerId} exited with code %s signal %s`, code, signal);
});
workerProcesses.set(workerId, senderProcess);
idleWorkers.push(workerId);
});
}
function sendToWorker(workerId, msgType, data) {
workerProcesses.get(workerId).send({
type: msgType,
data,
tid: messageTid
});
messageTid++;
}
function periodicCampaignsCheck() {
// noinspection JSIgnoredPromiseFromCall
scheduleCampaigns();
setTimeout(periodicCampaignsCheck, campaignsCheckPeriod);
}
async function init() {
const spawnWorkerFutures = [];
let workerId;
for (workerId = 0; workerId < config.queue.processes; workerId++) {
spawnWorkerFutures.push(spawnWorker(workerId));
}
await Promise.all(spawnWorkerFutures);
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'schedule-check') {
// noinspection JSIgnoredPromiseFromCall
scheduleCampaigns();
} else if (type === 'reload-config') {
for (const worker of workerProcesses.keys()) {
sendToWorker(workerId, 'reload-config', msg.data);
}
}
}
});
process.send({
type: 'master-sender-started'
});
periodicCampaignsCheck();
}
init();

View file

@ -0,0 +1,60 @@
'use strict';
const config = require('config');
const log = require('../lib/log');
const mailers = require('../lib/mailers');
const CampaignSender = require('../lib/campaign-sender');
const workerId = Number.parseInt(process.argv[2]);
let running = false;
async function processMessages(campaignId, subscribers) {
if (running) {
log.error('Senders', `Worker ${workerId} assigned work while working`);
return;
}
running = true;
const cs = new CampaignSender();
await cs.init({campaignId})
for (const subData of subscribers) {
try {
await cs.sendMessage(subData.listId, subData.email);
log.verbose('Senders', 'Message sent and status updated for %s:%s', subData.listId, subData.email);
} catch (err) {
log.error('Senders', `Sending message to ${subData.listId}:${subData.email} failed with error: ${err.message}`)
log.verbose(err);
}
}
running = false;
sendToMaster('messages-processed');
}
function sendToMaster(msgType) {
process.send({
type: msgType
});
}
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'reload-config') {
mailers.invalidateMailer(msg.data.sendConfigurationId);
} else if (type === 'process-messages') {
// noinspection JSIgnoredPromiseFromCall
processMessages(msg.data.campaignId, msg.data.subscribers)
}
}
});
sendToMaster('worker-started');

View file

@ -0,0 +1,197 @@
'use strict';
const log = require('../lib/log');
const config = require('config');
const crypto = require('crypto');
const humanize = require('humanize');
const http = require('http');
const SMTPServer = require('smtp-server').SMTPServer;
const simpleParser = require('mailparser').simpleParser;
let totalMessages = 0;
let received = 0;
const mailstore = {
accounts: {},
saveMessage(address, message) {
if (!this.accounts[address]) {
this.accounts[address] = [];
}
this.accounts[address].push(message);
},
getMail(address, callback) {
if (!this.accounts[address] || this.accounts[address].length === 0) {
let err = new Error('No mail for ' + address);
err.status = 404;
return callback(err);
}
simpleParser(this.accounts[address].shift(), (err, mail) => {
if (err) {
return callback(err.message || err);
}
callback(null, mail);
});
}
};
// Setup server
const server = new SMTPServer({
// log to console
logger: config.testServer.logger,
// not required but nice-to-have
banner: 'Welcome to My Awesome SMTP Server',
// disable STARTTLS to allow authentication in clear text mode
disabledCommands: ['STARTTLS'],
// By default only PLAIN and LOGIN are enabled
authMethods: ['PLAIN', 'LOGIN'],
// Accept messages up to 10 MB
size: 10 * 1024 * 1024,
// Setup authentication
onAuth: (auth, session, callback) => {
let username = config.testServer.username;
let password = config.testServer.password;
// check username and password
if (auth.username === username && auth.password === password) {
return callback(null, {
user: 'userdata' // value could be an user id, or an user object etc. This value can be accessed from session.user afterwards
});
}
return callback(new Error('Authentication failed'));
},
// Validate MAIL FROM envelope address. Example allows all addresses that do not start with 'deny'
// If this method is not set, all addresses are allowed
onMailFrom: (address, session, callback) => {
if (/^deny/i.test(address.address)) {
return callback(new Error('Not accepted'));
}
callback();
},
// Validate RCPT TO envelope address. Example allows all addresses that do not start with 'deny'
// If this method is not set, all addresses are allowed
onRcptTo: (address, session, callback) => {
let err;
if (/^deny/i.test(address.address)) {
return callback(new Error('Not accepted'));
}
// Reject messages larger than 100 bytes to an over-quota user
if (/^full/i.test(address.address) && Number(session.envelope.mailFrom.args.SIZE) > 100) {
err = new Error('Insufficient channel storage: ' + address.address);
err.responseCode = 452;
return callback(err);
}
callback();
},
// Handle message stream
onData: (stream, session, callback) => {
let hash = crypto.createHash('md5');
let message = '';
stream.on('data', chunk => {
hash.update(chunk);
if (/^keep/i.test(session.envelope.rcptTo[0].address)) {
message += chunk;
}
});
stream.on('end', () => {
let err;
if (stream.sizeExceeded) {
err = new Error('Error: message exceeds fixed maximum message size 10 MB');
err.responseCode = 552;
return callback(err);
}
// Store message for e2e tests
if (/^keep/i.test(session.envelope.rcptTo[0].address)) {
mailstore.saveMessage(session.envelope.rcptTo[0].address, message);
}
received++;
callback(null, 'Message queued as ' + hash.digest('hex')); // accept the message once the stream is ended
});
}
});
server.on('error', err => {
log.error('Test SMTP', err.stack);
});
let mailBoxServer = http.createServer((req, res) => {
let renderer = data => (
'<!doctype html><html><head><title>' + data.title + '</title></head><body>' + data.body + '</body></html>'
);
let address = req.url.substring(1);
mailstore.getMail(address, (err, mail) => {
if (err) {
let html = renderer({
title: 'error',
body: err.message || err
});
res.writeHead(err.status || 500, { 'Content-Type': 'text/html' });
return res.end(html);
}
let html = mail.html || renderer({
title: 'error',
body: 'This mail has no HTML part'
});
// https://nodemailer.com/extras/mailparser/#mail-object
delete mail.html;
delete mail.textAsHtml;
delete mail.attachments;
let script = '<script> var mailObject = ' + JSON.stringify(mail) + '; console.log(mailObject); </script>';
html = html.replace(/<\/body\b/i, match => script + match);
html = html.replace(/target="_blank"/g, 'target="_self"');
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(html);
});
});
mailBoxServer.on('error', err => {
log.error('Test SMTP Mailbox Server', err);
});
module.exports = callback => {
if (config.testServer.enabled) {
server.listen(config.testServer.port, config.testServer.host, () => {
log.info('Test SMTP', 'Server listening on port %s', config.testServer.port);
setInterval(() => {
if (received) {
totalMessages += received;
log.verbose(
'Test SMTP',
'Received %s new message%s in last 60 sec. (total %s messages)',
humanize.numberFormat(received, 0), received === 1 ? '' : 's',
humanize.numberFormat(totalMessages, 0)
);
received = 0;
}
}, 60 * 1000);
mailBoxServer.listen(config.testServer.mailboxServerPort, config.testServer.host, () => {
log.info('Test SMTP', 'Mail Box Server listening on port %s', config.testServer.mailboxServerPort);
setImmediate(callback);
});
});
} else {
setImmediate(callback);
}
};

180
server/services/triggers.js Normal file
View file

@ -0,0 +1,180 @@
'use strict';
const log = require('../lib/log');
const knex = require('../lib/knex');
const triggers = require('../models/triggers');
const campaigns = require('../models/campaigns');
const subscriptions = require('../models/subscriptions');
const segments = require('../models/segments');
const { Entity, Event } = require('../../shared/triggers');
const { SubscriptionStatus } = require('../../shared/lists');
const links = require('../models/links');
const contextHelpers = require('../lib/context-helpers');
const triggerCheckPeriod = 15 * 1000;
const triggerFirePeriod = 60 * 1000;
async function start() {
while (true) {
const fired = await knex.transaction(async tx => {
const currentTs = new Date();
const trigger = await tx('triggers').where('enabled', true).andWhere('last_check', '<', currentTs - triggerFirePeriod).orderBy('last_check', 'asc').first();
if (!trigger) {
return false;
}
const campaign = campaigns.getByIdTx(tx, contextHelpers.getAdminContext(), trigger.campaign, false);
for (const cpgList of campaign.lists) {
const addSegmentQuery = cpgList.segment ? await segments.getQueryGeneratorTx(tx, cpgList.list, cpgList.segment) : () => {
};
const subsTable = subscriptions.getSubscriptionTableName(cpgList.list);
let sqlQry = knex.from(subsTable)
.leftJoin(
function () {
return this.from('trigger_messages')
.where('trigger_messages.campaign', campaign.id)
.where('trigger_messages.list', cpgList.list)
.as('related_trigger_messages');
},
'related_trigger_messages.subscription', subsTable + '.id'
)
.where(function () {
addSegmentQuery(this);
})
.whereNotNull('related_trigger_messages.id') // This means only those where the trigger has not fired yet somewhen in the past
.select(subsTable + '.id');
let column;
if (trigger.entity === Entity.SUBSCRIPTION) {
column = subsTable + '.' + trigger.event;
} else if (trigger.entity === Entity.CAMPAIGN) {
if (trigger.event === Event[Entity.CAMPAIGN].DELIVERED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', campaign.id)
.where('campaign_messages.list', cpgList.list)
.as('campaign_messages');
}, 'campaign_messages.subscription', subsTable + '.id');
column = 'campaign_messages.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].OPENED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_links')
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.OPEN)
.as('campaign_links');
}, 'campaign_links', 'campaign_links.subscription', subsTable + '.id');
column = 'campaign_links.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].CLICKED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_links')
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.GENERAL_CLICK)
.as('campaign_links');
}, 'campaign_links', 'campaign_links.subscription', subsTable + '.id');
column = 'campaign_links.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].NOT_OPENED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', campaign.id)
.where('campaign_messages.list', cpgList.list)
.as('campaign_messages');
}, 'campaign_messages.subscription', subsTable + '.id')
.whereNotExists(function () {
return this
.select('*')
.from('campaign_links')
.whereRaw(`campaign_links.subscription = ${subsTable}.id`)
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.OPEN);
});
column = 'campaign_messages.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].NOT_CLICKED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', campaign.id)
.where('campaign_messages.list', cpgList.list)
.as('campaign_messages');
}, 'campaign_messages.subscription', subsTable + '.id')
.whereNotExists(function () {
return this
.select('*')
.from('campaign_links')
.whereRaw(`campaign_links.subscription = ${subsTable}.id`)
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.GENERAL_CLICK);
});
column = 'campaign_messages.created';
}
sqlQry = sqlQry.where(column, '<=', currentTs - trigger.seconds);
if (trigger.last_check !== null) {
sqlQry = sqlQry.where(column, '>', trigger.last_check);
}
}
const subscribers = await sqlQry;
for (const subscriber of subscribers) {
await tx('trigger_messages').insert({
campaign: campaign.id,
list: cpgList.list,
subscription: subscriber.id
});
await tx('queued').insert({
campaign: campaign.id,
list: cpgList.list,
subscription: subscriber.id,
trigger: trigger.id
});
await tx('triggers').increment('count').where('id', trigger.id);
log.verbose('Triggers', `Triggered ${trigger.name} (${trigger.id}) for subscriber ${subscriber.id}`);
}
}
await tx('triggers').update('last_check', currentTs).where('id', trigger.id);
return true;
});
if (!fired) {
const nextCycle = new Promise(resolve => {
setTimeout(resolve, triggerCheckPeriod);
});
await nextCycle;
}
}
}
module.exports.start = start;

View file

@ -0,0 +1,54 @@
'use strict';
// This script re-calculates timezone offsets once a day.
// We need this to be able to send messages using subscriber's local time
// The best option would be to use built-in timezone data of MySQL but
// the availability of timezone data is not guaranteed as it's an optional add on.
// So instead we keep a list of timezone offsets in a table that we can use to
// JOIN with subscription table. Subscription table includes timezone name for
// a subscriber and tzoffset table includes offset from UTC in minutes
const moment = require('moment-timezone');
const knex = require('../lib/knex');
const log = require('../lib/log');
let lastCheck = false;
const timezone_timeout = 60 * 60 * 1000;
async function updateTimezoneOffsets() {
log.verbose('UTC', 'Updating timezone offsets');
const values = [];
for (const tz of moment.tz.names()) {
values.push({
tz: tz.toLowerCase().trim(),
offset: moment.tz(tz).utcOffset()
});
}
await knex.transaction(async tx => {
await tx('tzoffset').del();
await tx('tzoffset').insert(values);
});
}
function start() {
let curUtcDate = new Date().toISOString().split('T').shift();
if (curUtcDate !== lastCheck) {
updateTimezoneOffsets()
.then(() => {
setTimeout(start, timezone_timeout)
})
.catch(err => {
log.error('UTC', err);
setTimeout(start, timezone_timeout);
});
} else {
setTimeout(start, timezone_timeout);
}
lastCheck = curUtcDate;
}
module.exports = {
start
};

View file

@ -0,0 +1,145 @@
'use strict';
const { nodeifyFunction, nodeifyPromise } = require('../lib/nodeify');
const log = require('../lib/log');
const config = require('config');
const {MailerError} = require('../lib/mailers');
const campaigns = require('../models/campaigns');
const contextHelpers = require('../lib/context-helpers');
const {SubscriptionStatus} = require('../../shared/lists');
const BounceHandler = require('bounce-handler').BounceHandler;
const SMTPServer = require('smtp-server').SMTPServer;
async function onRcptTo(address, session) {
const addrSplit = address.split('@');
if (addrSplit.length !== 2) {
throw new MailerError('Unknown user ' + address.address, 510);
}
const [user, host] = addrSplit;
const message = await campaigns.getMessageByCid(user);
if (!message) {
throw new MailerError('Unknown user ' + address.address, 510);
}
if (message.verp_hostname !== host) {
throw new MailerError('Unknown user ' + address.address, 510);
}
session.message = message;
log.verbose('VERP', 'Incoming message for Campaign %s, List %s, Subscription %s', cids.campaignId, cids.listId, cids.subscriptionId);
}
function onData(stream, session, callback) {
let chunks = [];
let totalLen = 0;
stream.on('data', chunk => {
if (!chunk || !chunk.length || totalLen > 60 * 1024) {
return;
}
chunks.push(chunk);
totalLen += chunk.length;
});
stream.on('end', () => nodeifyPromise(onStreamEnd(), callback));
const onStreamEnd = async () => {
const body = Buffer.concat(chunks, totalLen).toString();
const bh = new BounceHandler();
let bounceResult;
try {
bounceResult = [].concat(bh.parse_email(body) || []).shift();
} catch (E) {
log.error('Bounce', 'Failed parsing bounce message');
log.error('Bounce', JSON.stringify(body));
}
if (!bounceResult || ['failed', 'transient'].indexOf(bounceResult.action) < 0) {
return 'Message accepted';
} else {
await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), session.message, SubscriptionStatus.BOUNCED, bounceResult.action === 'failed');
log.verbose('VERP', 'Marked message %s as unsubscribed', session.message.campaign);
}
};
}
// Setup server
const server = new SMTPServer({
// log to console
logger: false,
banner: 'Mailtrain VERP bouncer',
disabledCommands: ['AUTH', 'STARTTLS'],
onRcptTo: nodeifyFunction(onRcptTo),
onData: onData
});
module.exports = callback => {
if (!config.verp.enabled) {
return setImmediate(callback);
}
let started = false;
server.on('error', err => {
const port = config.verp.port;
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
switch (err.code) {
case 'EACCES':
log.error('VERP', '%s requires elevated privileges', bind);
break;
case 'EADDRINUSE':
log.error('VERP', '%s is already in use', bind);
break;
case 'ECONNRESET': // Usually happens when a client does not disconnect cleanly
case 'EPIPE': // Remote connection was closed before the server attempted to send data
default:
log.error('VERP', err);
}
if (!started) {
started = true;
return callback(err);
}
});
let hosts;
if (typeof config.verp.host === 'string' && config.verp.host) {
hosts = config.verp.host.trim().split(',').map(host => host.trim()).filter(host => !!host);
if (hosts.indexOf('*') >= 0 || hosts.indexOf('all') >= 0) {
hosts = [false];
}
} else {
hosts = [false];
}
let pos = 0;
const startNextHost = () => {
if (pos >= hosts.length) {
started = true;
return setImmediate(callback);
}
let host = hosts[pos++];
server.listen(config.verp.port, host, () => {
if (started) {
return server.close();
}
log.info('VERP', 'Server listening on %s:%s', host || '*', config.verp.port);
setImmediate(startNextHost);
});
};
startNextHost();
};