Save new queueId/response on locally requeued messages
This commit is contained in:
parent
219c77606d
commit
f90e67d775
2 changed files with 49 additions and 16 deletions
|
@ -1086,6 +1086,24 @@ module.exports.updateMessage = (message, status, updateSubscription, callback) =
|
|||
});
|
||||
};
|
||||
|
||||
module.exports.updateMessageResponse = (message, response, response_id, callback) => {
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
let query = 'UPDATE `campaign__' + message.campaign + '` SET `response`=?, `response_id`=? WHERE id=? LIMIT 1';
|
||||
connection.query(query, [response, response_id, message.id], err => {
|
||||
connection.release();
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
return callback(null, true);
|
||||
});
|
||||
|
||||
});
|
||||
};
|
||||
|
||||
function createCampaignTables(id, callback) {
|
||||
let query = 'CREATE TABLE `campaign__' + id + '` LIKE campaign';
|
||||
db.getConnection((err, connection) => {
|
||||
|
|
|
@ -6,7 +6,6 @@ let net = require('net');
|
|||
let campaigns = require('../lib/models/campaigns');
|
||||
|
||||
let seenIds = new Set();
|
||||
let queueIds = {};
|
||||
|
||||
let server = net.createServer(socket => {
|
||||
let remainder = '';
|
||||
|
@ -32,6 +31,8 @@ let server = net.createServer(socket => {
|
|||
let match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/);
|
||||
if (match) {
|
||||
let queueId = match[1];
|
||||
let queued = '';
|
||||
let queued_as = '';
|
||||
|
||||
if (seenIds.has(queueId)) {
|
||||
return checkNextLine();
|
||||
|
@ -42,21 +43,30 @@ let server = net.createServer(socket => {
|
|||
let status = match[2];
|
||||
log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status);
|
||||
if ( status === 'sent' ) {
|
||||
let queued = / relay=127\.0\.0\.1/.test(line) && line.match(/ queued as (\w+)\)/);
|
||||
if ( queued ) {
|
||||
log.verbose('POSTFIXBOUNCE', 'Marked message %s as locally requeued as %s', queueId, queued[1]);
|
||||
queueIds[queued[1]] = queueId;
|
||||
// Save new queueId to update message's previous queueId (thanks @mfechner )
|
||||
if ( queued = / relay=127\.0\.0\.1/.test(line) && line.match(/status=sent \((.*)\)/) ) {
|
||||
queued = queued[1];
|
||||
queued_as = queued.match(/ queued as (\w+)/);
|
||||
queued_as = queued_as[1];
|
||||
}
|
||||
return checkNextLine();
|
||||
} else if ( queueId in queueIds ) {
|
||||
log.verbose('POSTFIXBOUNCE', 'Message %s was requeued from %s', queueId, queueIds[queueId]);
|
||||
queueId = queueIds[queueId];
|
||||
}
|
||||
|
||||
campaigns.findMailByResponse(queueId, (err, message) => {
|
||||
if (err || !message) {
|
||||
return checkNextLine();
|
||||
}
|
||||
if ( queued_as ) {
|
||||
log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queued_as);
|
||||
// Update message's previous queueId (thanks @mfechner )
|
||||
campaigns.updateMessageResponse(message, queued, queued_as, (err, updated) => {
|
||||
if (err) {
|
||||
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
|
||||
} else if (updated) {
|
||||
log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queued_as);
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
campaigns.updateMessage(message, 'bounced', true, (err, updated) => {
|
||||
if (err) {
|
||||
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
|
||||
|
@ -64,6 +74,11 @@ let server = net.createServer(socket => {
|
|||
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// No need to keep in memory... free it ( thanks @witzig )
|
||||
seenIds.delete(queueId);
|
||||
|
||||
return checkNextLine();
|
||||
});
|
||||
return;
|
||||
|
|
Loading…
Reference in a new issue