Merge pull request #269 from losacno/master

Check for locally requeued messages in postfix
This commit is contained in:
Andris Reinman 2017-07-10 18:22:04 +03:00 committed by GitHub
commit 091c00a218
2 changed files with 58 additions and 8 deletions

View file

@ -1086,6 +1086,24 @@ module.exports.updateMessage = (message, status, updateSubscription, callback) =
});
};
module.exports.updateMessageResponse = (message, response, response_id, callback) => {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let query = 'UPDATE `campaign__' + message.campaign + '` SET `response`=?, `response_id`=? WHERE id=? LIMIT 1';
connection.query(query, [response, response_id, message.id], err => {
connection.release();
if (err) {
return callback(err);
}
return callback(null, true);
});
});
};
function createCampaignTables(id, callback) {
let query = 'CREATE TABLE `campaign__' + id + '` LIKE campaign';
db.getConnection((err, connection) => {

View file

@ -28,19 +28,46 @@ let server = net.createServer(socket => {
return readNextChunk();
}
let line = lines[pos++];
let match = /\bstatus=bounced\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+)/);
let match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/);
if (match) {
let queueId = match[1];
let queued = '';
let queued_as = '';
if (seenIds.has(queueId)) {
return checkNextLine();
}
seenIds.add(queueId);
// Losacno: Check for local requeue
let status = match[2];
log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status);
if ( status === 'sent' ) {
// Save new queueId to update message's previous queueId (thanks @mfechner )
queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/);
if ( queued ) {
queued = queued[1];
queued_as = queued.match(/ queued as (\w+)/);
queued_as = queued_as[1];
}
}
campaigns.findMailByResponse(queueId, (err, message) => {
if (err || !message) {
return checkNextLine();
}
if ( queued_as ) {
log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queued_as);
// Update message's previous queueId (thanks @mfechner )
campaigns.updateMessageResponse(message, queued, queued_as, (err, updated) => {
if (err) {
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
} else if (updated) {
log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queued_as);
}
});
} else {
campaigns.updateMessage(message, 'bounced', true, (err, updated) => {
if (err) {
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
@ -48,6 +75,11 @@ let server = net.createServer(socket => {
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
}
});
}
// No need to keep in memory... free it ( thanks @witzig )
seenIds.delete(queueId);
return checkNextLine();
});
return;