Finished support for triggered campaigns. So far only smoke-tested for subscription trigger.

This commit is contained in:
Tomas Bures 2018-11-21 01:41:10 +03:00
parent 4f5b2d10e4
commit b37ad9863c
56 changed files with 416 additions and 213 deletions

View file

@ -15,6 +15,7 @@ const workerProcesses = new Map();
const idleWorkers = [];
let campaignSchedulerRunning = false;
let queuedSchedulerRunning = false;
let workerSchedulerRunning = false;
const campaignsCheckPeriod = 5 * 1000;
@ -27,6 +28,7 @@ const messageQueueCont = new Map(); // campaignId -> next batch callback
const workAssignment = new Map(); // workerId -> { campaignId, subscribers: [{listId, email}] }
let workerSchedulerCont = null;
let queuedLastId = 0;
function messagesProcessed(workerId) {
@ -151,7 +153,6 @@ async function processCampaign(campaignId) {
messageQueueCont.set(campaignId, resolve);
});
// noinspection JSIgnoredPromiseFromCall
setImmediate(scheduleWorkers);
await nextBatchNeeded;
@ -175,36 +176,86 @@ async function scheduleCampaigns() {
campaignSchedulerRunning = true;
while (true) {
let campaignId = 0;
try {
while (true) {
let campaignId = 0;
await knex.transaction(async tx => {
const scheduledCampaign = await tx('campaigns')
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.where('campaigns.status', CampaignStatus.SCHEDULED)
.where(qry => qry.whereNull('campaigns.scheduled').orWhere('campaigns.scheduled', '<=', new Date()))
.select(['id'])
.first();
await knex.transaction(async tx => {
const scheduledCampaign = await tx('campaigns')
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.where('campaigns.status', CampaignStatus.SCHEDULED)
.where(qry => qry.whereNull('campaigns.scheduled').orWhere('campaigns.scheduled', '<=', new Date()))
.select(['id'])
.first();
if (scheduledCampaign) {
await tx('campaigns').where('id', scheduledCampaign.id).update({status: CampaignStatus.SENDING});
campaignId = scheduledCampaign.id;
if (scheduledCampaign) {
await tx('campaigns').where('id', scheduledCampaign.id).update({status: CampaignStatus.SENDING});
campaignId = scheduledCampaign.id;
}
});
if (campaignId) {
// noinspection JSIgnoredPromiseFromCall
processCampaign(campaignId);
} else {
break;
}
});
if (campaignId) {
// noinspection JSIgnoredPromiseFromCall
processCampaign(campaignId);
} else {
break;
}
} catch (err) {
log.error('Senders', `Scheduling campaigns failed with error: ${err.message}`)
log.verbose(err);
}
campaignSchedulerRunning = false;
}
async function processQueued() {
if (queuedSchedulerRunning) {
return;
}
queuedSchedulerRunning = true;
try {
while (true) {
const rows = await knex('queued')
.orderBy('id', 'asc')
.where('id', '>', queuedLastId)
.limit(retrieveBatchSize);
if (rows.length === 0) {
break;
}
for (const row of rows) {
let msgQueue = messageQueue.get(row.campaign);
if (!msgQueue) {
msgQueue = [];
messageQueue.set(row.campaign, msgQueue);
}
msgQueue.push({
listId: row.list,
subscriptionId: row.subscription
});
}
queuedLastId = rows[rows.length - 1].id;
setImmediate(scheduleWorkers);
}
} catch (err) {
log.error('Senders', `Processing queued messages failed with error: ${err.message}`)
log.verbose(err);
}
queuedSchedulerRunning = false;
}
async function spawnWorker(workerId) {
return await new Promise((resolve, reject) => {
log.verbose('Senders', `Spawning worker process ${workerId}`);
@ -251,6 +302,9 @@ function periodicCampaignsCheck() {
// noinspection JSIgnoredPromiseFromCall
scheduleCampaigns();
// noinspection JSIgnoredPromiseFromCall
processQueued();
setTimeout(periodicCampaignsCheck, campaignsCheckPeriod);
}
@ -286,5 +340,6 @@ async function init() {
periodicCampaignsCheck();
}
// noinspection JSIgnoredPromiseFromCall
init();

View file

@ -4,6 +4,7 @@ const config = require('config');
const log = require('../lib/log');
const mailers = require('../lib/mailers');
const CampaignSender = require('../lib/campaign-sender');
const {enforce} = require('../lib/helpers');
const workerId = Number.parseInt(process.argv[2]);
let running = false;
@ -21,8 +22,17 @@ async function processMessages(campaignId, subscribers) {
for (const subData of subscribers) {
try {
await cs.sendMessage(subData.listId, subData.email);
log.verbose('Senders', 'Message sent and status updated for %s:%s', subData.listId, subData.email);
if (subData.email) {
await cs.sendMessageByEmail(subData.listId, subData.email);
} else if (subData.subscriptionId) {
await cs.sendMessageBySubscriptionId(subData.listId, subData.subscriptionId);
} else {
enforce(false);
}
log.verbose('Senders', 'Message sent and status updated for %s:%s', subData.listId, subData.email || subData.subscriptionId);
} catch (err) {
log.error('Senders', `Sending message to ${subData.listId}:${subData.email} failed with error: ${err.message}`)
log.verbose(err);

View file

@ -15,17 +15,21 @@ const triggerCheckPeriod = 15 * 1000;
const triggerFirePeriod = 60 * 1000;
async function start() {
async function run() {
while (true) {
const fired = await knex.transaction(async tx => {
const currentTs = new Date();
const currentTs = Date.now();
const trigger = await tx('triggers').where('enabled', true).andWhere('last_check', '<', currentTs - triggerFirePeriod).orderBy('last_check', 'asc').first();
const trigger = await tx('triggers')
.where('enabled', true)
.where(qry => qry.whereNull('last_check').orWhere('last_check', '<', new Date(currentTs - triggerFirePeriod)))
.orderBy('last_check', 'asc')
.first();
if (!trigger) {
return false;
}
const campaign = campaigns.getByIdTx(tx, contextHelpers.getAdminContext(), trigger.campaign, false);
const campaign = await campaigns.getByIdTx(tx, contextHelpers.getAdminContext(), trigger.campaign, false);
for (const cpgList of campaign.lists) {
const addSegmentQuery = cpgList.segment ? await segments.getQueryGeneratorTx(tx, cpgList.list, cpgList.segment) : () => {
@ -36,8 +40,10 @@ async function start() {
.leftJoin(
function () {
return this.from('trigger_messages')
.where('trigger_messages.campaign', campaign.id)
.innerJoin('triggers', 'trigger_messages.trigger', 'triggers.id')
.where('triggers.campaign', campaign.id)
.where('trigger_messages.list', cpgList.list)
.select(['id', 'subscription'])
.as('related_trigger_messages');
},
'related_trigger_messages.subscription', subsTable + '.id'
@ -45,7 +51,7 @@ async function start() {
.where(function () {
addSegmentQuery(this);
})
.whereNotNull('related_trigger_messages.id') // This means only those where the trigger has not fired yet somewhen in the past
.whereNull('related_trigger_messages.id') // This means only those where the trigger has not fired yet somewhen in the past
.select(subsTable + '.id');
let column;
@ -129,20 +135,18 @@ async function start() {
column = 'campaign_messages.created';
}
sqlQry = sqlQry.where(column, '<=', currentTs - trigger.seconds);
if (trigger.last_check !== null) {
sqlQry = sqlQry.where(column, '>', trigger.last_check);
}
}
sqlQry = sqlQry.where(column, '<=', new Date(currentTs - trigger.seconds));
if (trigger.last_check !== null) {
sqlQry = sqlQry.where(column, '>', trigger.last_check);
}
const subscribers = await sqlQry;
for (const subscriber of subscribers) {
await tx('trigger_messages').insert({
campaign: campaign.id,
trigger: trigger.id,
list: cpgList.list,
subscription: subscriber.id
});
@ -161,7 +165,7 @@ async function start() {
}
await tx('triggers').update('last_check', currentTs).where('id', trigger.id);
await tx('triggers').update('last_check', new Date(currentTs)).where('id', trigger.id);
return true;
});
@ -177,4 +181,11 @@ async function start() {
}
}
function start() {
log.info('Triggers', 'Starting trigger check service');
run().catch(err => {
log.error('Triggers', err);
});
}
module.exports.start = start;