Fixes. Reimplementation of the API transaction sender.

This commit is contained in:
Tomas Bures 2019-07-16 21:03:37 +05:30
parent a22187ef12
commit 8606652101
13 changed files with 350 additions and 276 deletions

View file

@ -142,6 +142,10 @@ function getExpirationThresholds() {
[MessageType.SUBSCRIPTION]: {
threshold: now - config.queue.retention.subscription * 1000,
title: 'subscription and password-related'
},
[MessageType.API_TRANSACTIONAL]: {
threshold: now - config.queue.retention.apiTransactional * 1000,
title: 'transactional (API)'
}
};
}
@ -295,33 +299,35 @@ async function workersLoop() {
async function processCampaign(campaignId) {
const msgQueue = campaignMessageQueue.get(campaignId);
const isCompleted = () => {
if (msgQueue.length > 0) return false;
let workerRunning = false;
for (const wa of workAssignment.values()) {
if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
workerRunning = true;
}
}
return !workerRunning;
};
async function finish(clearMsgQueue, newStatus) {
if (clearMsgQueue) {
msgQueue.splice(0);
}
const isCompleted = () => {
if (msgQueue.length > 0) return false;
let workerRunning = false;
for (const wa of workAssignment.values()) {
if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
workerRunning = true;
}
}
return !workerRunning;
};
while (!isCompleted()) {
await notifier.waitFor('workerFinished');
}
campaignMessageQueue.delete(campaignId);
if (newStatus) {
campaignMessageQueue.delete(campaignId);
await knex('campaigns').where('id', campaignId).update({status: newStatus});
await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newStatus});
await knex('campaigns').where('id', campaignId).update({status: newStatus});
await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newStatus});
}
}
try {
@ -364,7 +370,16 @@ async function processCampaign(campaignId) {
const subs = await qry;
if (subs.length === 0) {
return await finish(false, CampaignStatus.FINISHED);
if (isCompleted()) {
return await finish(false, CampaignStatus.FINISHED);
} else {
await finish(false);
// At this point, there might be messages that re-appeared because sending failed.
continue;
}
}
for (const sub of subs) {
@ -501,7 +516,7 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
try {
while (true) {
if (isSendConfigurationPostponed(sendConfigurationId)) {
return finish(true, true);
return await finish(true, true);
}
let messagesInProcessing = [...msgQueue];
@ -514,18 +529,17 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
const messageIdsInProcessing = messagesInProcessing.map(x => x.queuedMessage.id);
const rows = await knex('queued')
.orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST and MessageType.TRIGGERED
.orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.API_TRANSACTIONAL}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST, MessageType.API_TRANSACTIONAL and MessageType.TRIGGERED
.where('send_configuration', sendConfigurationId)
.whereNotIn('id', messageIdsInProcessing)
.limit(retrieveBatchSize);
if (rows.length === 0) {
if (isCompleted()) {
sendConfigurationMessageQueue.delete(sendConfigurationId);
return;
return await finish(false, true);
} else {
finish(false, false);
await finish(false, false);
// At this point, there might be new messages in the queued that could belong to us. Thus we have to try again instead for returning.
continue;