Added support for help text in custom fields.
Reimplemented the mechanism how campaign_messages are created.
This commit is contained in:
Tomas Bures 2019-07-22 23:54:24 +05:30
parent 025600e818
commit 4e4b77ca84
19 changed files with 223 additions and 200 deletions

View file

@ -5,7 +5,7 @@ const fork = require('../lib/fork').fork;
const log = require('../lib/log');
const path = require('path');
const knex = require('../lib/knex');
const {CampaignStatus, CampaignType} = require('../../shared/campaigns');
const {CampaignStatus, CampaignType, CampaignMessageStatus} = require('../../shared/campaigns');
const campaigns = require('../models/campaigns');
const builtinZoneMta = require('../lib/builtin-zone-mta');
const {CampaignActivityType} = require('../../shared/activity-log');
@ -62,10 +62,10 @@ const workerBatchSize = 10;
const sendConfigurationIdByCampaignId = new Map(); // campaignId -> sendConfigurationId
const sendConfigurationStatuses = new Map(); // sendConfigurationId -> {retryCount, postponeTill}
const sendConfigurationMessageQueue = new Map(); // sendConfigurationId -> [{queuedMessage}]
const campaignMessageQueue = new Map(); // campaignId -> [{listId, email}]
const sendConfigurationMessageQueue = new Map(); // sendConfigurationId -> [queuedMessage]
const campaignMessageQueue = new Map(); // campaignId -> [campaignMessage]
const workAssignment = new Map(); // workerId -> { type: WorkAssignmentType.CAMPAIGN, campaignId, messages: [{listId, email} } / { type: WorkAssignmentType.QUEUED, sendConfigurationId, messages: [{queuedMessage}] }
const workAssignment = new Map(); // workerId -> { type: WorkAssignmentType.CAMPAIGN, campaignId, messages: [campaignMessage] / { type: WorkAssignmentType.QUEUED, sendConfigurationId, messages: [queuedMessage] }
const WorkAssignmentType = {
CAMPAIGN: 0,
@ -330,7 +330,10 @@ async function processCampaign(campaignId) {
}
}
try {
await campaigns.prepareCampaignMessages(campaignId);
while (true) {
const cpg = await knex('campaigns').where('id', campaignId).first();
@ -350,53 +353,39 @@ async function processCampaign(campaignId) {
return await finish(true, CampaignStatus.SCHEDULED);
}
let qryGen;
await knex.transaction(async tx => {
qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId);
});
let messagesInProcessing = [...msgQueue];
for (const wa of workAssignment.values()) {
if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
messagesInProcessing = messagesInProcessing.concat(wa.messages);
}
}
if (qryGen) {
let messagesInProcessing = [...msgQueue];
for (const wa of workAssignment.values()) {
if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
messagesInProcessing = messagesInProcessing.concat(wa.messages);
}
const subs = await knex('campaign_messages')
.where({status: CampaignMessageStatus.SCHEDULED})
.whereNotIn('hash_email', messagesInProcessing.map(x => x.hash_email))
.limit(retrieveBatchSize);
if (subs.length === 0) {
if (isCompleted()) {
return await finish(false, CampaignStatus.FINISHED);
} else {
await finish(false);
// At this point, there might be messages that re-appeared because sending failed.
continue;
}
const qry = qryGen(knex)
.whereNotIn('pending_subscriptions.email', messagesInProcessing.map(x => x.email))
.select(['pending_subscriptions.email', 'campaign_lists.list'])
.limit(retrieveBatchSize);
const subs = await qry;
}
if (subs.length === 0) {
if (isCompleted()) {
return await finish(false, CampaignStatus.FINISHED);
for (const sub of subs) {
msgQueue.push(sub);
}
} else {
await finish(false);
notifier.notify('workAvailable');
// At this point, there might be messages that re-appeared because sending failed.
continue;
}
}
for (const sub of subs) {
msgQueue.push({
listId: sub.list,
email: sub.email
});
}
notifier.notify('workAvailable');
while (msgQueue.length > 0) {
await notifier.waitFor(`campaignMessageQueueEmpty:${campaignId}`);
}
} else {
return await finish(false, CampaignStatus.FINISHED);
while (msgQueue.length > 0) {
await notifier.waitFor(`campaignMessageQueueEmpty:${campaignId}`);
}
}
} catch (err) {
@ -526,7 +515,7 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
}
}
const messageIdsInProcessing = messagesInProcessing.map(x => x.queuedMessage.id);
const messageIdsInProcessing = messagesInProcessing.map(x => x.id);
const rows = await knex('queued')
.orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.API_TRANSACTIONAL}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST, MessageType.API_TRANSACTIONAL and MessageType.TRIGGERED
@ -561,9 +550,7 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
} else {
row.data = JSON.parse(row.data);
msgQueue.push({
queuedMessage: row
});
msgQueue.push(row);
}
}