Work on sending campaigns. Campaign status page half-way done, but does not work yet.
This commit is contained in:
parent
67d7129f7b
commit
d1fa4f4211
66 changed files with 1653 additions and 525 deletions
|
|
@ -5,35 +5,187 @@ const fork = require('child_process').fork;
|
|||
const log = require('npmlog');
|
||||
const path = require('path');
|
||||
const knex = require('../lib/knex');
|
||||
const {CampaignStatus, CampaignType} = require('../shared/campaigns');
|
||||
const { enforce } = require('../lib/helpers');
|
||||
const campaigns = require('../models/campaigns');
|
||||
const subscriptions = require('../models/subscriptions');
|
||||
const { SubscriptionStatus } = require('../shared/lists');
|
||||
const segments = require('../models/segments');
|
||||
|
||||
let messageTid = 0;
|
||||
let workerProcesses = new Map();
|
||||
const workerProcesses = new Map();
|
||||
|
||||
let running = false;
|
||||
const idleWorkers = [];
|
||||
|
||||
let campaignSchedulerRunning = false;
|
||||
let workerSchedulerRunning = false;
|
||||
|
||||
const campaignsCheckPeriod = 5 * 1000;
|
||||
const retrieveBatchSize = 1000;
|
||||
const workerBatchSize = 100;
|
||||
|
||||
const messageQueue = new Map(); // campaignId -> [{listId, email}]
|
||||
const messageQueueCont = new Map(); // campaignId -> next batch callback
|
||||
|
||||
const workerSchedulerCont = null;
|
||||
|
||||
|
||||
function messagesProcessed(workerId) {
|
||||
idleWorkers.push(workerId);
|
||||
|
||||
if (workerSchedulerCont) {
|
||||
const cont = workerSchedulerCont;
|
||||
setImmediate(workerSchedulerCont);
|
||||
workerSchedulerCont = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function scheduleWorkers() {
|
||||
async function getAvailableWorker() {
|
||||
if (idleWorkers.length > 0) {
|
||||
return idleWorkers.shift();
|
||||
|
||||
} else {
|
||||
const workerAvailable = new Promise(resolve => {
|
||||
workerSchedulerCont = resolve;
|
||||
});
|
||||
|
||||
await workerAvailable;
|
||||
return idleWorkers.shift();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (workerSchedulerRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
workerSchedulerRunning = true;
|
||||
let workerId = await getAvailableWorker();
|
||||
|
||||
let keepLooping = true;
|
||||
|
||||
while (keepLooping) {
|
||||
keepLooping = false;
|
||||
|
||||
for (const campaignId of messageQueue.keys()) {
|
||||
const queue = messageQueue.get(campaignId);
|
||||
|
||||
if (queue.length > 0) {
|
||||
const msgs = queue.splice(0, workerBatchSize);
|
||||
|
||||
if (queue.length === 0 && messageQueueCont.has(campaignId)) {
|
||||
const scheduleMessages = messageQueueCont.get(campaignId);
|
||||
setImmediate(scheduleMessages);
|
||||
}
|
||||
|
||||
sendToWorker(workerId, 'process-messages', msgs);
|
||||
workerId = await getAvailableWorker();
|
||||
|
||||
keepLooping = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
idleWorkers.push(workerId);
|
||||
|
||||
workerSchedulerRunning = false;
|
||||
}
|
||||
|
||||
/*
|
||||
const path = require('path');
|
||||
const log = require('npmlog');
|
||||
const fsExtra = require('fs-extra-promise');
|
||||
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
|
||||
const imports = require('../models/imports');
|
||||
const fields = require('../models/fields');
|
||||
const subscriptions = require('../models/subscriptions');
|
||||
const { Writable } = require('stream');
|
||||
const { cleanupFromPost, enforce } = require('../lib/helpers');
|
||||
const contextHelpers = require('../lib/context-helpers');
|
||||
const tools = require('../lib/tools');
|
||||
const shares = require('../models/shares');
|
||||
const _ = require('../lib/translate')._;
|
||||
*/
|
||||
|
||||
|
||||
async function processCampaign(campaignId) {
|
||||
const campaignSubscribersTable = 'campaign__' + campaignId;
|
||||
async function finish() {
|
||||
await knex('campaigns').where('id', campaignId).update({status: CampaignStatus.FINISHED});
|
||||
messageQueue.delete(campaignId);
|
||||
}
|
||||
|
||||
const msgQueue = [];
|
||||
messageQueue.set(campaignId, msgQueue);
|
||||
|
||||
while (true) {
|
||||
const cpg = await knex('campaigns').where('id', campaignId).first();
|
||||
|
||||
if (cpg.status === CampaignStatus.PAUSED) {
|
||||
await finish();
|
||||
return;
|
||||
}
|
||||
|
||||
let qryGen;
|
||||
await knex.transaction(async tx => {
|
||||
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true, retrieveBatchSize);
|
||||
});
|
||||
|
||||
if (qryGen) {
|
||||
const qry = qryGen(knex).select(['pending_subscriptions.email', 'campaign_lists.list']);
|
||||
const subs = await qry;
|
||||
|
||||
if (subs.length === 0) {
|
||||
await finish();
|
||||
return;
|
||||
}
|
||||
|
||||
for (const sub of subs) {
|
||||
msgQueue.push({
|
||||
listId: sub.list,
|
||||
email: sub.email
|
||||
});
|
||||
}
|
||||
|
||||
const nextBatchNeeded = new Promise(resolve => {
|
||||
messageQueueCont.set(campaignId, resolve);
|
||||
});
|
||||
|
||||
// noinspection JSIgnoredPromiseFromCall
|
||||
setImmediate(scheduleWorkers);
|
||||
|
||||
await nextBatchNeeded;
|
||||
|
||||
} else {
|
||||
await finish();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async function scheduleCampaigns() {
|
||||
if (campaignSchedulerRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
campaignSchedulerRunning = true;
|
||||
|
||||
while (true) {
|
||||
let campaignId = 0;
|
||||
|
||||
await knex.transaction(async tx => {
|
||||
const scheduledCampaign = await tx('campaigns')
|
||||
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
|
||||
.where('campaigns.status', CampaignStatus.SCHEDULED)
|
||||
.where(qry => qry.whereNull('campaigns.scheduled').orWhere('campaigns.scheduled', '<=', new Date()))
|
||||
.select(['id'])
|
||||
.first();
|
||||
|
||||
if (scheduledCampaign) {
|
||||
await tx('campaigns').where('id', scheduledCampaign.id).update({status: CampaignStatus.SENDING});
|
||||
campaignId = scheduledCampaign.id;
|
||||
}
|
||||
});
|
||||
|
||||
if (campaignId) {
|
||||
// noinspection JSIgnoredPromiseFromCall
|
||||
processCampaign(campaignId);
|
||||
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
campaignSchedulerRunning = false;
|
||||
}
|
||||
|
||||
|
||||
async function spawnWorker(workerId) {
|
||||
return await new Promise((resolve, reject) => {
|
||||
log.verbose('Senders', `Spawning worker process ${workerId}`);
|
||||
|
|
@ -48,7 +200,11 @@ async function spawnWorker(workerId) {
|
|||
if (msg.type === 'worker-started') {
|
||||
log.info('Senders', `Worker process ${workerId} started`);
|
||||
return resolve();
|
||||
|
||||
} else if (msg.type === 'messages-processed') {
|
||||
messageProcessed(workerId);
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -57,21 +213,10 @@ async function spawnWorker(workerId) {
|
|||
});
|
||||
|
||||
workerProcesses.set(workerId, senderProcess);
|
||||
idleWorkers.push(workerId);
|
||||
});
|
||||
}
|
||||
|
||||
async function run() {
|
||||
if (running) {
|
||||
return;
|
||||
}
|
||||
|
||||
running = true;
|
||||
|
||||
// FIXME
|
||||
|
||||
running = false;
|
||||
}
|
||||
|
||||
function sendToWorker(workerId, msgType, data) {
|
||||
workerProcesses.get(workerId).send({
|
||||
type: msgType,
|
||||
|
|
@ -82,6 +227,14 @@ function sendToWorker(workerId, msgType, data) {
|
|||
messageTid++;
|
||||
}
|
||||
|
||||
|
||||
function periodicCampaignsCheck() {
|
||||
// noinspection JSIgnoredPromiseFromCall
|
||||
scheduleCampaigns();
|
||||
|
||||
setTimeout(periodicCampaignsCheck, campaignsCheckPeriod);
|
||||
}
|
||||
|
||||
async function init() {
|
||||
const spawnWorkerFutures = [];
|
||||
let workerId;
|
||||
|
|
@ -95,12 +248,13 @@ async function init() {
|
|||
if (msg) {
|
||||
const type = msg.type;
|
||||
|
||||
if (type === 'scheduleCheck') {
|
||||
// FIXME
|
||||
if (type === 'schedule-check') {
|
||||
// noinspection JSIgnoredPromiseFromCall
|
||||
scheduleCampaigns();
|
||||
|
||||
} else if (type === 'reloadConfig') {
|
||||
} else if (type === 'reload-config') {
|
||||
for (const worker of workerProcesses.keys()) {
|
||||
sendToWorker(workerId, 'reloadConfig', msg.data);
|
||||
sendToWorker(workerId, 'reload-config', msg.data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -110,7 +264,7 @@ async function init() {
|
|||
type: 'sender-started'
|
||||
});
|
||||
|
||||
run();
|
||||
periodicCampaignsCheck();
|
||||
}
|
||||
|
||||
init();
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ const shares = require('../models/shares');
|
|||
const _ = require('../lib/translate')._;
|
||||
*/
|
||||
|
||||
async function sendMail() {
|
||||
async function processMessages(msgs) {
|
||||
if (running) {
|
||||
log.error('Senders', `Worker ${workerId} assigned work while working`);
|
||||
return;
|
||||
|
|
@ -31,9 +31,12 @@ async function sendMail() {
|
|||
|
||||
running = true;
|
||||
|
||||
console.log(msgs);
|
||||
// FIXME
|
||||
|
||||
running = false;
|
||||
|
||||
sendToMaster('messages-processed');
|
||||
}
|
||||
|
||||
function sendToMaster(msgType) {
|
||||
|
|
@ -46,11 +49,12 @@ process.on('message', msg => {
|
|||
if (msg) {
|
||||
const type = msg.type;
|
||||
|
||||
if (type === 'reloadConfig') {
|
||||
if (type === 'reload-config') {
|
||||
mailers.invalidateMailer(msg.data.sendConfigurationId);
|
||||
|
||||
} else if (type === 'sendMail') {
|
||||
// FIXME
|
||||
} else if (type === 'process-messages') {
|
||||
// noinspection JSIgnoredPromiseFromCall
|
||||
processMessages(msg.data)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
145
services/verp-server.js
Normal file
145
services/verp-server.js
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
'use strict';
|
||||
|
||||
const { nodeifyFunction, nodeifyPromise } = require('../lib/nodeify');
|
||||
const log = require('npmlog');
|
||||
const config = require('config');
|
||||
const {MailerError} = require('../lib/mailers');
|
||||
const campaigns = require('../models/campaigns');
|
||||
const contextHelpers = require('../lib/context-helpers');
|
||||
const {SubscriptionStatus} = require('../shared/lists');
|
||||
|
||||
const BounceHandler = require('bounce-handler').BounceHandler;
|
||||
const SMTPServer = require('smtp-server').SMTPServer;
|
||||
|
||||
async function onRcptTo(address, session) {
|
||||
const addrSplit = address.split('@');
|
||||
|
||||
if (addrSplit.length !== 2) {
|
||||
throw new MailerError('Unknown user ' + address.address, 510);
|
||||
}
|
||||
|
||||
const [user, host] = addrSplit;
|
||||
|
||||
const message = await campaigns.getMessageByCid(user);
|
||||
|
||||
if (!message) {
|
||||
throw new MailerError('Unknown user ' + address.address, 510);
|
||||
}
|
||||
|
||||
if (message.verp_hostname !== host) {
|
||||
throw new MailerError('Unknown user ' + address.address, 510);
|
||||
}
|
||||
|
||||
session.message = message;
|
||||
|
||||
log.verbose('VERP', 'Incoming message for Campaign %s, List %s, Subscription %s', cids.campaignId, cids.listId, cids.subscriptionId);
|
||||
}
|
||||
|
||||
function onData(stream, session, callback) {
|
||||
let chunks = [];
|
||||
let totalLen = 0;
|
||||
|
||||
stream.on('data', chunk => {
|
||||
if (!chunk || !chunk.length || totalLen > 60 * 1024) {
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
totalLen += chunk.length;
|
||||
});
|
||||
|
||||
stream.on('end', () => nodeifyPromise(onStreamEnd(), callback));
|
||||
|
||||
const onStreamEnd = async () => {
|
||||
const body = Buffer.concat(chunks, totalLen).toString();
|
||||
|
||||
const bh = new BounceHandler();
|
||||
let bounceResult;
|
||||
|
||||
try {
|
||||
bounceResult = [].concat(bh.parse_email(body) || []).shift();
|
||||
} catch (E) {
|
||||
log.error('Bounce', 'Failed parsing bounce message');
|
||||
log.error('Bounce', JSON.stringify(body));
|
||||
}
|
||||
|
||||
if (!bounceResult || ['failed', 'transient'].indexOf(bounceResult.action) < 0) {
|
||||
return 'Message accepted';
|
||||
} else {
|
||||
await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), session.message, SubscriptionStatus.BOUNCED, bounceResult.action === 'failed');
|
||||
log.verbose('VERP', 'Marked message %s as unsubscribed', session.message.campaign);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Setup server
|
||||
const server = new SMTPServer({
|
||||
|
||||
// log to console
|
||||
logger: false,
|
||||
|
||||
banner: 'Mailtrain VERP bouncer',
|
||||
|
||||
disabledCommands: ['AUTH', 'STARTTLS'],
|
||||
|
||||
onRcptTo: nodeifyFunction(onRcptTo),
|
||||
onData: onData
|
||||
});
|
||||
|
||||
module.exports = callback => {
|
||||
if (!config.verp.enabled) {
|
||||
return setImmediate(callback);
|
||||
}
|
||||
|
||||
let started = false;
|
||||
|
||||
server.on('error', err => {
|
||||
const port = config.verp.port;
|
||||
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
|
||||
|
||||
switch (err.code) {
|
||||
case 'EACCES':
|
||||
log.error('VERP', '%s requires elevated privileges', bind);
|
||||
break;
|
||||
case 'EADDRINUSE':
|
||||
log.error('VERP', '%s is already in use', bind);
|
||||
break;
|
||||
case 'ECONNRESET': // Usually happens when a client does not disconnect cleanly
|
||||
case 'EPIPE': // Remote connection was closed before the server attempted to send data
|
||||
default:
|
||||
log.error('VERP', err);
|
||||
}
|
||||
|
||||
if (!started) {
|
||||
started = true;
|
||||
return callback(err);
|
||||
}
|
||||
});
|
||||
|
||||
let hosts;
|
||||
if (typeof config.verp.host === 'string' && config.verp.host) {
|
||||
hosts = config.verp.host.trim().split(',').map(host => host.trim()).filter(host => !!host);
|
||||
if (hosts.indexOf('*') >= 0 || hosts.indexOf('all') >= 0) {
|
||||
hosts = [false];
|
||||
}
|
||||
} else {
|
||||
hosts = [false];
|
||||
}
|
||||
|
||||
let pos = 0;
|
||||
const startNextHost = () => {
|
||||
if (pos >= hosts.length) {
|
||||
started = true;
|
||||
return setImmediate(callback);
|
||||
}
|
||||
let host = hosts[pos++];
|
||||
server.listen(config.verp.port, host, () => {
|
||||
if (started) {
|
||||
return server.close();
|
||||
}
|
||||
log.info('VERP', 'Server listening on %s:%s', host || '*', config.verp.port);
|
||||
setImmediate(startNextHost);
|
||||
});
|
||||
};
|
||||
|
||||
startNextHost();
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue