Bugfixes in sending campaigns

This commit is contained in:
Tomas Bures 2018-09-27 21:32:35 +02:00
parent 2d667523a1
commit 1448d9e914
34 changed files with 95 additions and 55 deletions

View file

@ -7,7 +7,7 @@
const reportHelpers = require('../lib/report-helpers');
const fork = require('child_process').fork;
const path = require('path');
const log = require('npmlog');
const log = require('../lib/log');
const fs = require('fs');
const privilegeHelpers = require('../lib/privilege-helpers');

View file

@ -1,6 +1,6 @@
'use strict';
const log = require('npmlog');
const log = require('../lib/log');
const knex = require('../lib/knex');
const feedparser = require('feedparser-promised');
const { CampaignType, CampaignStatus, CampaignSource } = require('../shared/campaigns');

View file

@ -2,7 +2,7 @@
const knex = require('../lib/knex');
const path = require('path');
const log = require('npmlog');
const log = require('../lib/log');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');

View file

@ -1,6 +1,6 @@
'use strict';
const log = require('npmlog');
const log = require('../lib/log');
const config = require('config');
const net = require('net');
const campaigns = require('../models/campaigns');

View file

@ -2,7 +2,7 @@
const config = require('config');
const fork = require('child_process').fork;
const log = require('npmlog');
const log = require('../lib/log');
const path = require('path');
const knex = require('../lib/knex');
const {CampaignStatus, CampaignType} = require('../shared/campaigns');
@ -24,10 +24,13 @@ const workerBatchSize = 100;
const messageQueue = new Map(); // campaignId -> [{listId, email}]
const messageQueueCont = new Map(); // campaignId -> next batch callback
const workAssignment = new Map(); // workerId -> { campaignId, subscribers: [{listId, email}] }
let workerSchedulerCont = null;
function messagesProcessed(workerId) {
workAssignment.delete(workerId);
idleWorkers.push(workerId);
if (workerSchedulerCont) {
@ -70,6 +73,7 @@ async function scheduleWorkers() {
if (queue.length > 0) {
const subscribers = queue.splice(0, workerBatchSize);
workAssignment.set(workerId, {campaignId, subscribers});
if (queue.length === 0 && messageQueueCont.has(campaignId)) {
const scheduleMessages = messageQueueCont.get(campaignId);
@ -113,11 +117,21 @@ async function processCampaign(campaignId) {
let qryGen;
await knex.transaction(async tx => {
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true, retrieveBatchSize);
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true);
});
if (qryGen) {
const qry = qryGen(knex).select(['pending_subscriptions.email', 'campaign_lists.list']);
let subscribersInProcessing = [...msgQueue];
for (const wa of workAssignment.values()) {
if (wa.campaignId === campaignId) {
subscribersInProcessing = subscribersInProcessing.concat(wa.subscribers);
}
}
const qry = qryGen(knex)
.whereNotIn('pending_subscriptions.email', subscribersInProcessing.map(x => x.email))
.select(['pending_subscriptions.email', 'campaign_lists.list'])
.limit(retrieveBatchSize);
const subs = await qry;
if (subs.length === 0) {
@ -261,7 +275,7 @@ async function init() {
});
process.send({
type: 'sender-started'
type: 'master-sender-started'
});
periodicCampaignsCheck();

View file

@ -1,7 +1,7 @@
'use strict';
const config = require('config');
const log = require('npmlog');
const log = require('../lib/log');
const mailers = require('../lib/mailers');
const CampaignSender = require('../lib/campaign-sender');
@ -25,6 +25,7 @@ async function processMessages(campaignId, subscribers) {
log.verbose('Senders', 'Message sent and status updated for %s:%s', subData.listId, subData.email);
} catch (err) {
log.error('Senders', `Sending message to ${subData.listId}:${subData.email} failed with error: ${err.message}`)
log.verbose(err);
}
}

View file

@ -1,6 +1,6 @@
'use strict';
const log = require('npmlog');
const log = require('../lib/log');
const config = require('config');
const crypto = require('crypto');
const humanize = require('humanize');

View file

@ -1,6 +1,6 @@
'use strict';
const log = require('npmlog');
const log = require('../lib/log');
const knex = require('../lib/knex');
const triggers = require('../models/triggers');
const campaigns = require('../models/campaigns');

View file

@ -10,7 +10,7 @@
const moment = require('moment-timezone');
const knex = require('../lib/knex');
const log = require('npmlog');
const log = require('../lib/log');
let lastCheck = false;
const timezone_timeout = 60 * 60 * 1000;

View file

@ -1,7 +1,7 @@
'use strict';
const { nodeifyFunction, nodeifyPromise } = require('../lib/nodeify');
const log = require('npmlog');
const log = require('../lib/log');
const config = require('config');
const {MailerError} = require('../lib/mailers');
const campaigns = require('../models/campaigns');