733 lines
23 KiB
JavaScript
733 lines
23 KiB
JavaScript
'use strict';
|
|
|
|
const config = require('../lib/config');
|
|
const fork = require('../lib/fork').fork;
|
|
const log = require('../lib/log');
|
|
const path = require('path');
|
|
const knex = require('../lib/knex');
|
|
const {CampaignStatus, CampaignType, CampaignMessageStatus} = require('../../shared/campaigns');
|
|
const campaigns = require('../models/campaigns');
|
|
const builtinZoneMta = require('../lib/builtin-zone-mta');
|
|
const {CampaignActivityType} = require('../../shared/activity-log');
|
|
const activityLog = require('../lib/activity-log');
|
|
const {MessageType} = require('../lib/message-sender');
|
|
require('../lib/fork');
|
|
|
|
class Notifications {
|
|
constructor() {
|
|
this.conts = new Map();
|
|
}
|
|
|
|
notify(id) {
|
|
const cont = this.conts.get(id);
|
|
if (cont) {
|
|
for (const cb of cont) {
|
|
setImmediate(cb);
|
|
}
|
|
this.conts.delete(id);
|
|
}
|
|
}
|
|
|
|
async waitFor(id) {
|
|
let cont = this.conts.get(id);
|
|
if (!cont) {
|
|
cont = [];
|
|
}
|
|
|
|
const notified = new Promise(resolve => {
|
|
cont.push(resolve);
|
|
});
|
|
|
|
this.conts.set(id, cont);
|
|
|
|
await notified;
|
|
}
|
|
}
|
|
|
|
const notifier = new Notifications();
|
|
|
|
let messageTid = 0;
|
|
const workerProcesses = new Map();
|
|
|
|
const workersCount = config.queue.processes;
|
|
const idleWorkers = [];
|
|
|
|
let campaignSchedulerRunning = false;
|
|
let queuedSchedulerRunning = false;
|
|
|
|
const checkPeriod = 30 * 1000;
|
|
const retrieveBatchSize = 1000;
|
|
const workerBatchSize = 10;
|
|
|
|
const sendConfigurationIdByCampaignId = new Map(); // campaignId -> sendConfigurationId
|
|
const sendConfigurationStatuses = new Map(); // sendConfigurationId -> {retryCount, postponeTill}
|
|
|
|
const sendConfigurationMessageQueue = new Map(); // sendConfigurationId -> [queuedMessage]
|
|
const campaignMessageQueue = new Map(); // campaignId -> [campaignMessage]
|
|
|
|
const workAssignment = new Map(); // workerId -> { type: WorkAssignmentType.CAMPAIGN, campaignId, messages: [campaignMessage] / { type: WorkAssignmentType.QUEUED, sendConfigurationId, messages: [queuedMessage] }
|
|
|
|
const WorkAssignmentType = {
|
|
CAMPAIGN: 0,
|
|
QUEUED: 1
|
|
};
|
|
|
|
const retryBackoff = [10, 20, 30, 30, 60, 60, 120, 120, 300]; // in seconds
|
|
|
|
function getSendConfigurationStatus(sendConfigurationId) {
|
|
let status = sendConfigurationStatuses.get(sendConfigurationId);
|
|
if (!status) {
|
|
status = {
|
|
retryCount: 0,
|
|
postponeTill: 0
|
|
};
|
|
|
|
sendConfigurationStatuses.set(sendConfigurationId, status);
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
function setSendConfigurationRetryCount(sendConfigurationStatus, newRetryCount) {
|
|
sendConfigurationStatus.retryCount = newRetryCount;
|
|
|
|
let next = 0;
|
|
if (newRetryCount > 0) {
|
|
let backoff;
|
|
if (newRetryCount > retryBackoff.length) {
|
|
backoff = retryBackoff[retryBackoff.length - 1];
|
|
} else {
|
|
backoff = retryBackoff[newRetryCount - 1];
|
|
}
|
|
|
|
next = Date.now() + backoff * 1000;
|
|
setTimeout(scheduleCheck, backoff * 1000);
|
|
}
|
|
|
|
sendConfigurationStatus.postponeTill = next;
|
|
}
|
|
|
|
function isSendConfigurationPostponed(sendConfigurationId) {
|
|
const now = Date.now();
|
|
const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId);
|
|
return sendConfigurationStatus.postponeTill > now;
|
|
}
|
|
|
|
function getPostponedSendConfigurationIds() {
|
|
const result = [];
|
|
const now = Date.now();
|
|
|
|
for (const entry of sendConfigurationStatuses.entries()) {
|
|
if (entry[1].postponeTill > now) {
|
|
result.push(entry[0]);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
function getExpirationThresholds() {
|
|
const now = Date.now();
|
|
|
|
return {
|
|
[MessageType.TRIGGERED]: {
|
|
threshold: now - config.queue.retention.triggered * 1000,
|
|
title: 'triggered campaign'
|
|
},
|
|
[MessageType.TEST]: {
|
|
threshold: now - config.queue.retention.test * 1000,
|
|
title: 'test campaign'
|
|
},
|
|
[MessageType.SUBSCRIPTION]: {
|
|
threshold: now - config.queue.retention.subscription * 1000,
|
|
title: 'subscription and password-related'
|
|
},
|
|
[MessageType.API_TRANSACTIONAL]: {
|
|
threshold: now - config.queue.retention.apiTransactional * 1000,
|
|
title: 'transactional (API)'
|
|
}
|
|
};
|
|
}
|
|
|
|
|
|
function messagesProcessed(workerId, withErrors) {
|
|
const wa = workAssignment.get(workerId);
|
|
|
|
const sendConfigurationStatus = getSendConfigurationStatus(wa.sendConfigurationId);
|
|
if (withErrors) {
|
|
if (sendConfigurationStatus.retryCount === wa.sendConfigurationRetryCount) { // This is to avoid multiple increments when more workers simultaneously fail to send messages ot the same send configuration
|
|
setSendConfigurationRetryCount(sendConfigurationStatus, sendConfigurationStatus.retryCount + 1);
|
|
}
|
|
} else {
|
|
setSendConfigurationRetryCount(sendConfigurationStatus, 0);
|
|
}
|
|
|
|
|
|
workAssignment.delete(workerId);
|
|
idleWorkers.push(workerId);
|
|
|
|
notifier.notify('workerFinished');
|
|
}
|
|
|
|
async function workersLoop() {
|
|
async function getAvailableWorker() {
|
|
while (idleWorkers.length === 0) {
|
|
await notifier.waitFor('workerFinished');
|
|
}
|
|
|
|
return idleWorkers.shift();
|
|
}
|
|
|
|
function cancelWorker(workerId) {
|
|
idleWorkers.push(workerId);
|
|
}
|
|
|
|
function selectNextTask() {
|
|
const allocationMap = new Map();
|
|
const allocation = [];
|
|
|
|
function initAllocation(waType, attrName, queues, workerMsg, getSendConfigurationId, getQueueEmptyEvent) {
|
|
for (const id of queues.keys()) {
|
|
const sendConfigurationId = getSendConfigurationId(id);
|
|
const key = attrName + ':' + id;
|
|
|
|
const queue = queues.get(id);
|
|
|
|
const postponed = isSendConfigurationPostponed(sendConfigurationId);
|
|
|
|
const task = {
|
|
type: waType,
|
|
id,
|
|
existingWorkers: 0,
|
|
isValid: queue.length > 0 && !postponed,
|
|
queue,
|
|
workerMsg,
|
|
attrName,
|
|
getQueueEmptyEvent,
|
|
sendConfigurationId
|
|
};
|
|
|
|
allocationMap.set(key, task);
|
|
allocation.push(task);
|
|
|
|
if (postponed && queue.length > 0) {
|
|
queue.splice(0);
|
|
notifier.notify(task.getQueueEmptyEvent(task));
|
|
}
|
|
}
|
|
|
|
for (const wa of workAssignment.values()) {
|
|
if (wa.type === waType) {
|
|
const key = attrName + ':' + wa[attrName];
|
|
const task = allocationMap.get(key);
|
|
task.existingWorkers += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
initAllocation(
|
|
WorkAssignmentType.QUEUED,
|
|
'sendConfigurationId',
|
|
sendConfigurationMessageQueue,
|
|
'process-queued-messages',
|
|
id => id,
|
|
task => `sendConfigurationMessageQueueEmpty:${task.id}`
|
|
);
|
|
|
|
initAllocation(
|
|
WorkAssignmentType.CAMPAIGN,
|
|
'campaignId',
|
|
campaignMessageQueue,
|
|
'process-campaign-messages',
|
|
id => sendConfigurationIdByCampaignId.get(id),
|
|
task => `campaignMessageQueueEmpty:${task.id}`
|
|
);
|
|
|
|
let minTask = null;
|
|
let minExistingWorkers;
|
|
|
|
for (const task of allocation) {
|
|
if (task.isValid && (minTask === null || minExistingWorkers > task.existingWorkers)) {
|
|
minTask = task;
|
|
minExistingWorkers = task.existingWorkers;
|
|
}
|
|
}
|
|
|
|
return minTask;
|
|
}
|
|
|
|
|
|
while (true) {
|
|
const workerId = await getAvailableWorker();
|
|
const task = selectNextTask();
|
|
|
|
if (task) {
|
|
const attrName = task.attrName;
|
|
const sendConfigurationId = task.sendConfigurationId;
|
|
const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId);
|
|
const sendConfigurationRetryCount = sendConfigurationStatus.retryCount;
|
|
|
|
const queue = task.queue;
|
|
|
|
const messages = queue.splice(0, workerBatchSize);
|
|
workAssignment.set(workerId, {
|
|
type: task.type,
|
|
[attrName]: task.id,
|
|
sendConfigurationId,
|
|
sendConfigurationRetryCount,
|
|
messages
|
|
});
|
|
|
|
if (queue.length === 0) {
|
|
notifier.notify(task.getQueueEmptyEvent(task));
|
|
}
|
|
|
|
sendToWorker(workerId, task.workerMsg, {
|
|
[attrName]: task.id,
|
|
messages
|
|
});
|
|
|
|
} else {
|
|
cancelWorker(workerId);
|
|
await notifier.waitFor('workAvailable');
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
async function processCampaign(campaignId) {
|
|
const msgQueue = campaignMessageQueue.get(campaignId);
|
|
|
|
const isCompleted = () => {
|
|
if (msgQueue.length > 0) return false;
|
|
|
|
let workerRunning = false;
|
|
|
|
for (const wa of workAssignment.values()) {
|
|
if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
|
|
workerRunning = true;
|
|
}
|
|
}
|
|
|
|
return !workerRunning;
|
|
};
|
|
|
|
async function finish(clearMsgQueue, newStatus) {
|
|
if (clearMsgQueue) {
|
|
msgQueue.splice(0);
|
|
}
|
|
|
|
while (!isCompleted()) {
|
|
await notifier.waitFor('workerFinished');
|
|
}
|
|
|
|
if (newStatus) {
|
|
campaignMessageQueue.delete(campaignId);
|
|
|
|
await knex('campaigns').where('id', campaignId).update({status: newStatus});
|
|
await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newStatus});
|
|
}
|
|
}
|
|
|
|
|
|
try {
|
|
await campaigns.prepareCampaignMessages(campaignId);
|
|
|
|
while (true) {
|
|
const cpg = await knex('campaigns').where('id', campaignId).first();
|
|
|
|
if (cpg.status === CampaignStatus.PAUSING) {
|
|
return await finish(true, CampaignStatus.PAUSED);
|
|
}
|
|
|
|
const expirationThreshold = Date.now() - config.queue.retention.campaign * 1000;
|
|
if (cpg.start_at && cpg.start_at.valueOf() < expirationThreshold) {
|
|
return await finish(true, CampaignStatus.FINISHED);
|
|
}
|
|
|
|
sendConfigurationIdByCampaignId.set(cpg.id, cpg.send_configuration);
|
|
|
|
if (isSendConfigurationPostponed(cpg.send_configuration)) {
|
|
// postpone campaign if its send configuration is problematic
|
|
return await finish(true, CampaignStatus.SCHEDULED);
|
|
}
|
|
|
|
let messagesInProcessing = [...msgQueue];
|
|
for (const wa of workAssignment.values()) {
|
|
if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
|
|
messagesInProcessing = messagesInProcessing.concat(wa.messages);
|
|
}
|
|
}
|
|
|
|
const subs = await knex('campaign_messages')
|
|
.where({status: CampaignMessageStatus.SCHEDULED, campaign: campaignId})
|
|
.whereNotIn('hash_email', messagesInProcessing.map(x => x.hash_email))
|
|
.limit(retrieveBatchSize);
|
|
|
|
if (subs.length === 0) {
|
|
if (isCompleted()) {
|
|
return await finish(false, CampaignStatus.FINISHED);
|
|
|
|
} else {
|
|
await finish(false);
|
|
|
|
// At this point, there might be messages that re-appeared because sending failed.
|
|
continue;
|
|
}
|
|
|
|
}
|
|
|
|
for (const sub of subs) {
|
|
msgQueue.push(sub);
|
|
}
|
|
|
|
notifier.notify('workAvailable');
|
|
|
|
while (msgQueue.length > 0) {
|
|
await notifier.waitFor(`campaignMessageQueueEmpty:${campaignId}`);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
log.error('Senders', `Sending campaign ${campaignId} failed with error: ${err.message}`);
|
|
log.verbose(err.stack);
|
|
}
|
|
}
|
|
|
|
|
|
async function scheduleCampaigns() {
|
|
if (campaignSchedulerRunning) {
|
|
return;
|
|
}
|
|
|
|
campaignSchedulerRunning = true;
|
|
|
|
try {
|
|
// Finish old campaigns
|
|
const nowDate = new Date();
|
|
const now = nowDate.valueOf();
|
|
|
|
const expirationThreshold = new Date(now - config.queue.retention.campaign * 1000);
|
|
const expiredCampaigns = await knex('campaigns')
|
|
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
|
|
.whereIn('campaigns.status', [CampaignStatus.SCHEDULED, CampaignStatus.PAUSED])
|
|
.where('campaigns.start_at', '<', expirationThreshold)
|
|
.update({status: CampaignStatus.FINISHED});
|
|
|
|
// Empty message queues for PAUSING campaigns. A pausing campaign typically waits for campaignMessageQueueEmpty before it can check for PAUSING
|
|
// We speed this up by discarding messages in the message queue of the campaign.
|
|
const pausingCampaigns = await knex('campaigns')
|
|
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
|
|
.where('campaigns.status', CampaignStatus.PAUSING)
|
|
.select(['id'])
|
|
.forUpdate();
|
|
|
|
for (const cpg of pausingCampaigns) {
|
|
const campaignId = cpg.id;
|
|
const queue = campaignMessageQueue.get(campaignId);
|
|
queue.splice(0);
|
|
notifier.notify(`campaignMessageQueueEmpty:${campaignId}`);
|
|
}
|
|
|
|
|
|
while (true) {
|
|
let campaignId = 0;
|
|
const postponedSendConfigurationIds = getPostponedSendConfigurationIds();
|
|
|
|
await knex.transaction(async tx => {
|
|
const scheduledCampaign = await tx('campaigns')
|
|
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
|
|
.whereNotIn('campaigns.send_configuration', postponedSendConfigurationIds)
|
|
.where('campaigns.status', CampaignStatus.SCHEDULED)
|
|
.where('campaigns.start_at', '<=', nowDate)
|
|
.select(['id'])
|
|
.forUpdate()
|
|
.first();
|
|
|
|
if (scheduledCampaign) {
|
|
await tx('campaigns').where('id', scheduledCampaign.id).update({status: CampaignStatus.SENDING});
|
|
await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, scheduledCampaign.id, {status: CampaignStatus.SENDING});
|
|
campaignId = scheduledCampaign.id;
|
|
}
|
|
});
|
|
|
|
if (campaignId) {
|
|
campaignMessageQueue.set(campaignId, []);
|
|
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
processCampaign(campaignId);
|
|
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
} catch (err) {
|
|
log.error('Senders', `Scheduling campaigns failed with error: ${err.message}`);
|
|
log.verbose(err.stack);
|
|
}
|
|
|
|
campaignSchedulerRunning = false;
|
|
}
|
|
|
|
|
|
async function processQueuedBySendConfiguration(sendConfigurationId) {
|
|
const msgQueue = sendConfigurationMessageQueue.get(sendConfigurationId);
|
|
|
|
const isCompleted = () => {
|
|
if (msgQueue.length > 0) return false;
|
|
|
|
let workerRunning = false;
|
|
|
|
for (const wa of workAssignment.values()) {
|
|
if (wa.type === WorkAssignmentType.QUEUED && wa.sendConfigurationId === sendConfigurationId) {
|
|
workerRunning = true;
|
|
}
|
|
}
|
|
|
|
return !workerRunning;
|
|
};
|
|
|
|
async function finish(clearMsgQueue, deleteMsgQueue) {
|
|
if (clearMsgQueue) {
|
|
msgQueue.splice(0);
|
|
}
|
|
|
|
while (!isCompleted()) {
|
|
await notifier.waitFor('workerFinished');
|
|
}
|
|
|
|
if (deleteMsgQueue) {
|
|
sendConfigurationMessageQueue.delete(sendConfigurationId);
|
|
}
|
|
}
|
|
|
|
|
|
try {
|
|
while (true) {
|
|
if (isSendConfigurationPostponed(sendConfigurationId)) {
|
|
return await finish(true, true);
|
|
}
|
|
|
|
let messagesInProcessing = [...msgQueue];
|
|
for (const wa of workAssignment.values()) {
|
|
if (wa.type === WorkAssignmentType.QUEUED && wa.sendConfigurationId === sendConfigurationId) {
|
|
messagesInProcessing = messagesInProcessing.concat(wa.messages);
|
|
}
|
|
}
|
|
|
|
const messageIdsInProcessing = messagesInProcessing.map(x => x.id);
|
|
|
|
const rows = await knex('queued')
|
|
.orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.API_TRANSACTIONAL}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST, MessageType.API_TRANSACTIONAL and MessageType.TRIGGERED
|
|
.where('send_configuration', sendConfigurationId)
|
|
.whereNotIn('id', messageIdsInProcessing)
|
|
.limit(retrieveBatchSize);
|
|
|
|
if (rows.length === 0) {
|
|
if (isCompleted()) {
|
|
return await finish(false, true);
|
|
|
|
} else {
|
|
await finish(false, false);
|
|
|
|
// At this point, there might be new messages in the queued that could belong to us. Thus we have to try again instead for returning.
|
|
continue;
|
|
}
|
|
}
|
|
|
|
const expirationThresholds = getExpirationThresholds();
|
|
const expirationCounters = {};
|
|
for (const type in expirationThresholds) {
|
|
expirationCounters[type] = 0;
|
|
}
|
|
|
|
for (const row of rows) {
|
|
const expirationThreshold = expirationThresholds[row.type];
|
|
|
|
if (row.created < expirationThreshold.threshold) {
|
|
expirationCounters[row.type] += 1;
|
|
await knex('queued').where('id', row.id).del();
|
|
|
|
} else {
|
|
row.data = JSON.parse(row.data);
|
|
msgQueue.push(row);
|
|
}
|
|
}
|
|
|
|
for (const type in expirationThresholds) {
|
|
const expirationThreshold = expirationThresholds[type];
|
|
if (expirationCounters[type] > 0) {
|
|
log.warn('Senders', `Discarded ${expirationCounters[type]} expired ${expirationThreshold.title} message(s).`);
|
|
}
|
|
}
|
|
|
|
notifier.notify('workAvailable');
|
|
|
|
while (msgQueue.length > 0) {
|
|
await notifier.waitFor(`sendConfigurationMessageQueueEmpty:${sendConfigurationId}`);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
log.error('Senders', `Sending queued messages for send configuration ${sendConfigurationId} failed with error: ${err.message}`);
|
|
log.verbose(err.stack);
|
|
}
|
|
}
|
|
|
|
async function scheduleQueued() {
|
|
if (queuedSchedulerRunning) {
|
|
return;
|
|
}
|
|
|
|
queuedSchedulerRunning = true;
|
|
|
|
try {
|
|
const sendConfigurationsIdsInProcessing = [...sendConfigurationMessageQueue.keys()];
|
|
const postponedSendConfigurationIds = getPostponedSendConfigurationIds();
|
|
|
|
// prune old messages
|
|
const expirationThresholds = getExpirationThresholds();
|
|
for (const type in expirationThresholds) {
|
|
const expirationThreshold = expirationThresholds[type];
|
|
|
|
const expiredCount = await knex('queued')
|
|
.whereNotIn('send_configuration', sendConfigurationsIdsInProcessing)
|
|
.where('type', type)
|
|
.where('created', '<', new Date(expirationThreshold.threshold))
|
|
.del();
|
|
|
|
if (expiredCount) {
|
|
log.warn('Senders', `Discarded ${expiredCount} expired ${expirationThreshold.title} message(s).`);
|
|
}
|
|
}
|
|
|
|
const rows = await knex('queued')
|
|
.whereNotIn('send_configuration', [...sendConfigurationsIdsInProcessing, ...postponedSendConfigurationIds])
|
|
.groupBy('send_configuration')
|
|
.select(['send_configuration']);
|
|
|
|
for (const row of rows) {
|
|
const sendConfigurationId = row.send_configuration;
|
|
sendConfigurationMessageQueue.set(sendConfigurationId, []);
|
|
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
processQueuedBySendConfiguration(sendConfigurationId);
|
|
}
|
|
} catch (err) {
|
|
log.error('Senders', `Scheduling queued messages failed with error: ${err.message}`);
|
|
log.verbose(err.stack);
|
|
}
|
|
|
|
queuedSchedulerRunning = false;
|
|
}
|
|
|
|
|
|
async function spawnWorker(workerId) {
|
|
return await new Promise((resolve, reject) => {
|
|
log.verbose('Senders', `Spawning worker process ${workerId}`);
|
|
|
|
const senderProcess = fork(path.join(__dirname, 'sender-worker.js'), [workerId], {
|
|
cwd: path.join(__dirname, '..'),
|
|
env: {
|
|
NODE_ENV: process.env.NODE_ENV,
|
|
BUILTIN_ZONE_MTA_PASSWORD: builtinZoneMta.getPassword()
|
|
}
|
|
});
|
|
|
|
senderProcess.on('message', msg => {
|
|
if (msg) {
|
|
if (msg.type === 'worker-started') {
|
|
log.info('Senders', `Worker process ${workerId} started`);
|
|
return resolve();
|
|
|
|
} else if (msg.type === 'messages-processed') {
|
|
messagesProcessed(workerId, msg.data.withErrors);
|
|
}
|
|
|
|
}
|
|
});
|
|
|
|
senderProcess.on('close', (code, signal) => {
|
|
log.error('Senders', `Worker process ${workerId} exited with code %s signal %s`, code, signal);
|
|
});
|
|
|
|
workerProcesses.set(workerId, senderProcess);
|
|
idleWorkers.push(workerId);
|
|
});
|
|
}
|
|
|
|
function sendToWorker(workerId, msgType, data) {
|
|
workerProcesses.get(workerId).send({
|
|
type: msgType,
|
|
data,
|
|
tid: messageTid
|
|
});
|
|
|
|
messageTid++;
|
|
}
|
|
|
|
|
|
function scheduleCheck() {
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
scheduleCampaigns();
|
|
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
scheduleQueued();
|
|
}
|
|
|
|
function periodicCheck() {
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
scheduleCheck();
|
|
|
|
setTimeout(periodicCheck, checkPeriod);
|
|
}
|
|
|
|
|
|
async function init() {
|
|
const spawnWorkerFutures = [];
|
|
let workerId;
|
|
for (workerId = 0; workerId < workersCount; workerId++) {
|
|
spawnWorkerFutures.push(spawnWorker(workerId));
|
|
}
|
|
|
|
await Promise.all(spawnWorkerFutures);
|
|
|
|
process.on('message', msg => {
|
|
if (msg) {
|
|
const type = msg.type;
|
|
|
|
if (type === 'schedule-check') {
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
scheduleCheck();
|
|
|
|
} else if (type === 'reload-config') {
|
|
const sendConfigurationStatus = getSendConfigurationStatus(msg.data.sendConfigurationId);
|
|
if (sendConfigurationStatus.retryCount > 0) {
|
|
const sendConfigurationStatus = getSendConfigurationStatus(msg.data.sendConfigurationId)
|
|
setSendConfigurationRetryCount(sendConfigurationStatus, 0);
|
|
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
scheduleCheck();
|
|
}
|
|
|
|
for (const workerId of workerProcesses.keys()) {
|
|
sendToWorker(workerId, 'reload-config', msg.data);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
if (config.title) {
|
|
process.title = config.title + ': sender/master';
|
|
}
|
|
|
|
process.send({
|
|
type: 'master-sender-started'
|
|
});
|
|
|
|
periodicCheck();
|
|
|
|
setImmediate(workersLoop);
|
|
}
|
|
|
|
// noinspection JSIgnoredPromiseFromCall
|
|
init();
|