Various fixes.
This commit is contained in:
parent
dd9b8b464a
commit
83ce716d94
21 changed files with 99 additions and 114 deletions
|
@ -8,6 +8,7 @@ const knex = require('../lib/knex');
|
|||
const {CampaignStatus, CampaignType} = require('../../shared/campaigns');
|
||||
const { enforce } = require('../lib/helpers');
|
||||
const campaigns = require('../models/campaigns');
|
||||
const builtinZoneMta = require('../lib/builtin-zone-mta');
|
||||
|
||||
let messageTid = 0;
|
||||
const workerProcesses = new Map();
|
||||
|
@ -24,6 +25,7 @@ const workerBatchSize = 100;
|
|||
|
||||
const messageQueue = new Map(); // campaignId -> [{listId, email}]
|
||||
const messageQueueCont = new Map(); // campaignId -> next batch callback
|
||||
const campaignFinishCont = new Map(); // campaignId -> worker finished callback
|
||||
|
||||
const workAssignment = new Map(); // workerId -> { campaignId, subscribers: [{listId, email}] }
|
||||
|
||||
|
@ -32,6 +34,8 @@ let queuedLastId = 0;
|
|||
|
||||
|
||||
function messagesProcessed(workerId) {
|
||||
const wa = workAssignment.get(workerId);
|
||||
|
||||
workAssignment.delete(workerId);
|
||||
idleWorkers.push(workerId);
|
||||
|
||||
|
@ -40,6 +44,11 @@ function messagesProcessed(workerId) {
|
|||
setImmediate(workerSchedulerCont);
|
||||
workerSchedulerCont = null;
|
||||
}
|
||||
|
||||
if (campaignFinishCont.has(wa.campaignId)) {
|
||||
setImmediate(campaignFinishCont.get(wa.campaignId));
|
||||
campaignFinishCont.delete(wa.campaignId);
|
||||
}
|
||||
}
|
||||
|
||||
async function scheduleWorkers() {
|
||||
|
@ -78,8 +87,8 @@ async function scheduleWorkers() {
|
|||
workAssignment.set(workerId, {campaignId, subscribers});
|
||||
|
||||
if (queue.length === 0 && messageQueueCont.has(campaignId)) {
|
||||
const scheduleMessages = messageQueueCont.get(campaignId);
|
||||
setImmediate(scheduleMessages);
|
||||
setImmediate(messageQueueCont.get(campaignId));
|
||||
messageQueueCont.delete(campaignId);
|
||||
}
|
||||
|
||||
sendToWorker(workerId, 'process-messages', {
|
||||
|
@ -99,9 +108,24 @@ async function scheduleWorkers() {
|
|||
}
|
||||
|
||||
|
||||
|
||||
async function processCampaign(campaignId) {
|
||||
async function finish() {
|
||||
let workerRunning = false;
|
||||
for (const wa of workAssignment.values()) {
|
||||
if (wa.campaignId === campaignId) {
|
||||
workerRunning = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (workerRunning) {
|
||||
const workerFinished = new Promise(resolve => {
|
||||
campaignFinishCont.set(campaignId, resolve);
|
||||
});
|
||||
|
||||
await workerFinished;
|
||||
setImmediate(finish);
|
||||
}
|
||||
|
||||
await knex('campaigns').where('id', campaignId).update({status: CampaignStatus.FINISHED});
|
||||
messageQueue.delete(campaignId);
|
||||
}
|
||||
|
@ -120,7 +144,7 @@ async function processCampaign(campaignId) {
|
|||
|
||||
let qryGen;
|
||||
await knex.transaction(async tx => {
|
||||
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true);
|
||||
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId);
|
||||
});
|
||||
|
||||
if (qryGen) {
|
||||
|
@ -262,7 +286,10 @@ async function spawnWorker(workerId) {
|
|||
|
||||
const senderProcess = fork(path.join(__dirname, 'sender-worker.js'), [workerId], {
|
||||
cwd: path.join(__dirname, '..'),
|
||||
env: {NODE_ENV: process.env.NODE_ENV}
|
||||
env: {
|
||||
NODE_ENV: process.env.NODE_ENV,
|
||||
BUILTIN_ZONE_MTA_PASSWORD: builtinZoneMta.getPassword()
|
||||
}
|
||||
});
|
||||
|
||||
senderProcess.on('message', msg => {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue