Added support for processing Postfix logs to find bounces

This commit is contained in:
Andris Reinman 2016-09-22 12:04:30 +03:00
parent 73127ed7b4
commit f0784156fe
4 changed files with 114 additions and 15 deletions

View file

@ -172,6 +172,7 @@ Mailtrain uses webhooks integration to detect bounces and spam complaints. Curre
* **SendGrid** use `http://domain/webhooks/sendgrid` as the webhook URL for bounces and complaints ([instructions](https://github.com/andris9/mailtrain/wiki/Setting-up-Webhooks-for-SendGrid))
* **Mailgun** use `http://domain/webhooks/mailgun` as the webhook URL for bounces and complaints ([instructions](https://github.com/andris9/mailtrain/wiki/Setting-up-Webhooks-for-Mailgun))
* **ZoneMTA** use `http://domain/webhooks/zone-mta` as the webhook URL for bounces. If you install Mailtrain with the included installation script then this route gets set up automatically during the installation process
* **Postfix** This is not a webhook but a TCP server on port 5699 to listen for piped Postfix log. Enable it with the `[postfixbounce]` config option. To use it, pipe the log to that port: `tail -f -n +0 /var/log/mail.log | nc localhost 5699 -` (if Mailtrain restarts then you need to re-establish the tail pipe), alternatively you could send the log with a cron job periodically `tail -n 100 | nc localhost 5699 -`)
Additionally Mailtrain (v1.1+) is able to use VERP-based bounce handling. This would require to have a compatible SMTP relay (the services mentioned above strip out or block VERP addresses in the SMTP envelope) and you also need to set up special MX DNS name that points to your Mailtrain installation server.

View file

@ -96,3 +96,12 @@ port=3002
baseDN="ou=users,dc=company"
filter="(|(username={{username}})(mail={{username}}))"
passwordresetlink=""
[postfixbounce]
# Enable to allow writing Postfix bounce log to Mailtrain listener
# If enabled, tail mail.log to Mailtrain with the following command:
# tail -f -n +0 /var/log/mail.log | nc localhost 5699 -
enabled=false
port=5699
# allow connections from localhost only
host="127.0.0.1"

View file

@ -13,6 +13,7 @@ let triggers = require('./services/triggers');
let importer = require('./services/importer');
let verpServer = require('./services/verp-server');
let testServer = require('./services/test-server');
let postfixBounceServer = require('./services/postfix-bounce-server');
let tzupdate = require('./services/tzupdate');
let feedcheck = require('./services/feedcheck');
let dbcheck = require('./lib/dbcheck');
@ -79,23 +80,25 @@ server.on('listening', () => {
triggers(() => {
sender(() => {
feedcheck(() => {
log.info('Service', 'All services started');
if (config.group) {
try {
process.setgid(config.group);
log.info('Service', 'Changed group to "%s" (%s)', config.group, process.getgid());
} catch (E) {
log.info('Service', 'Failed to change group to "%s" (%s)', config.group, E.message);
postfixBounceServer(() => {
log.info('Service', 'All services started');
if (config.group) {
try {
process.setgid(config.group);
log.info('Service', 'Changed group to "%s" (%s)', config.group, process.getgid());
} catch (E) {
log.info('Service', 'Failed to change group to "%s" (%s)', config.group, E.message);
}
}
}
if (config.user) {
try {
process.setuid(config.user);
log.info('Service', 'Changed user to "%s" (%s)', config.user, process.getuid());
} catch (E) {
log.info('Service', 'Failed to change user to "%s" (%s)', config.user, E.message);
if (config.user) {
try {
process.setuid(config.user);
log.info('Service', 'Changed user to "%s" (%s)', config.user, process.getuid());
} catch (E) {
log.info('Service', 'Failed to change user to "%s" (%s)', config.user, E.message);
}
}
}
});
});
});
});

View file

@ -0,0 +1,86 @@
'use strict';
let log = require('npmlog');
let config = require('config');
let net = require('net');
let campaigns = require('../lib/models/campaigns');
let seenIds = new Set();
let server = net.createServer(socket => {
let remainder = '';
let reading = false;
let readNextChunk = () => {
let chunk = socket.read();
if (chunk === null) {
reading = false;
return;
}
reading = true;
let lines = (remainder + chunk.toString()).split(/\r?\n/);
remainder = lines.pop();
let pos = 0;
let checkNextLine = () => {
if (pos >= lines.length) {
return readNextChunk();
}
let line = lines[pos++];
let match = /\bstatus=bounced\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+)/);
if (match) {
let queueId = match[1];
if (seenIds.has(queueId)) {
return checkNextLine();
}
seenIds.add(queueId);
campaigns.findMailByResponse(queueId, (err, message) => {
if (err || !message) {
return checkNextLine();
}
campaigns.updateMessage(message, 'bounced', true, (err, updated) => {
if (err) {
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err.stack);
} else if (updated) {
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
}
});
return checkNextLine();
});
return;
} else {
return checkNextLine();
}
};
checkNextLine();
};
socket.on('readable', () => {
if (reading) {
return false;
}
readNextChunk();
});
});
server.on('error', err => {
log.error('POSTFIXBOUNCE', err.stack);
});
module.exports = callback => {
if (config.postfixbounce.enabled) {
server.listen(config.postfixbounce.port, config.postfixbounce.host, () => {
log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port);
setImmediate(callback);
});
} else {
setImmediate(callback);
}
};