From f90e67d775900633fb006efce8c3e7c9d92a125f Mon Sep 17 00:00:00 2001 From: Luc LosCan Date: Mon, 10 Jul 2017 12:13:17 +0200 Subject: [PATCH] Save new queueId/response on locally requeued messages --- lib/models/campaigns.js | 18 ++++++++++++ services/postfix-bounce-server.js | 47 ++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/lib/models/campaigns.js b/lib/models/campaigns.js index 0fb4323d..bfef231f 100644 --- a/lib/models/campaigns.js +++ b/lib/models/campaigns.js @@ -1086,6 +1086,24 @@ module.exports.updateMessage = (message, status, updateSubscription, callback) = }); }; +module.exports.updateMessageResponse = (message, response, response_id, callback) => { + db.getConnection((err, connection) => { + if (err) { + return callback(err); + } + + let query = 'UPDATE `campaign__' + message.campaign + '` SET `response`=?, `response_id`=? WHERE id=? LIMIT 1'; + connection.query(query, [response, response_id, message.id], err => { + connection.release(); + if (err) { + return callback(err); + } + return callback(null, true); + }); + + }); +}; + function createCampaignTables(id, callback) { let query = 'CREATE TABLE `campaign__' + id + '` LIKE campaign'; db.getConnection((err, connection) => { diff --git a/services/postfix-bounce-server.js b/services/postfix-bounce-server.js index f41602cf..d08233c7 100644 --- a/services/postfix-bounce-server.js +++ b/services/postfix-bounce-server.js @@ -6,7 +6,6 @@ let net = require('net'); let campaigns = require('../lib/models/campaigns'); let seenIds = new Set(); -let queueIds = {}; let server = net.createServer(socket => { let remainder = ''; @@ -32,6 +31,8 @@ let server = net.createServer(socket => { let match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/); if (match) { let queueId = match[1]; + let queued = ''; + let queued_as = ''; if (seenIds.has(queueId)) { return checkNextLine(); @@ -42,28 +43,42 @@ let server = net.createServer(socket => { let status = match[2]; log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status); if ( status === 'sent' ) { - let queued = / relay=127\.0\.0\.1/.test(line) && line.match(/ queued as (\w+)\)/); - if ( queued ) { - log.verbose('POSTFIXBOUNCE', 'Marked message %s as locally requeued as %s', queueId, queued[1]); - queueIds[queued[1]] = queueId; + // Save new queueId to update message's previous queueId (thanks @mfechner ) + if ( queued = / relay=127\.0\.0\.1/.test(line) && line.match(/status=sent \((.*)\)/) ) { + queued = queued[1]; + queued_as = queued.match(/ queued as (\w+)/); + queued_as = queued_as[1]; } - return checkNextLine(); - } else if ( queueId in queueIds ) { - log.verbose('POSTFIXBOUNCE', 'Message %s was requeued from %s', queueId, queueIds[queueId]); - queueId = queueIds[queueId]; } campaigns.findMailByResponse(queueId, (err, message) => { if (err || !message) { return checkNextLine(); } - campaigns.updateMessage(message, 'bounced', true, (err, updated) => { - if (err) { - log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack); - } else if (updated) { - log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId); - } - }); + if ( queued_as ) { + log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queued_as); + // Update message's previous queueId (thanks @mfechner ) + campaigns.updateMessageResponse(message, queued, queued_as, (err, updated) => { + if (err) { + log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack); + } else if (updated) { + log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queued_as); + } + }); + + } else { + campaigns.updateMessage(message, 'bounced', true, (err, updated) => { + if (err) { + log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack); + } else if (updated) { + log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId); + } + }); + } + + // No need to keep in memory... free it ( thanks @witzig ) + seenIds.delete(queueId); + return checkNextLine(); }); return;