2016-09-22 09:04:30 +00:00
'use strict';
let log = require('npmlog');
let config = require('config');
let net = require('net');
let campaigns = require('../lib/models/campaigns');
let seenIds = new Set();
let server = net.createServer(socket => {
let remainder = '';
let reading = false;
let readNextChunk = () => {
let chunk = socket.read();
if (chunk === null) {
reading = false;
reading = true;
let lines = (remainder + chunk.toString()).split(/\r?\n/);
remainder = lines.pop();
let pos = 0;
let checkNextLine = () => {
if (pos >= lines.length) {
return readNextChunk();
let line = lines[pos++];
2017-06-27 16:22:12 +00:00
let match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/);
2016-09-22 09:04:30 +00:00
if (match) {
let queueId = match[1];
2017-07-10 10:13:17 +00:00
let queued = '';
let queued_as = '';
2016-09-22 09:04:30 +00:00
if (seenIds.has(queueId)) {
return checkNextLine();
2017-06-27 16:22:12 +00:00
// Losacno: Check for local requeue
let status = match[2];
log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status);
2017-06-28 08:31:08 +00:00
if ( status === 'sent' ) {
2017-07-10 10:13:17 +00:00
// Save new queueId to update message's previous queueId (thanks @mfechner )
2017-07-10 15:14:36 +00:00
queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/);
2017-07-10 10:22:01 +00:00
if ( queued ) {
2017-07-10 10:13:17 +00:00
queued = queued[1];
queued_as = queued.match(/ queued as (\w+)/);
queued_as = queued_as[1];
2017-06-27 16:22:12 +00:00
2016-09-22 09:04:30 +00:00
campaigns.findMailByResponse(queueId, (err, message) => {
if (err || !message) {
return checkNextLine();
2017-07-10 10:13:17 +00:00
if ( queued_as ) {
log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queued_as);
// Update message's previous queueId (thanks @mfechner )
campaigns.updateMessageResponse(message, queued, queued_as, (err, updated) => {
if (err) {
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
} else if (updated) {
log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queued_as);
} else {
campaigns.updateMessage(message, 'bounced', true, (err, updated) => {
if (err) {
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
} else if (updated) {
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
// No need to keep in memory... free it ( thanks @witzig )
2016-09-22 09:04:30 +00:00
return checkNextLine();
} else {
return checkNextLine();
socket.on('readable', () => {
if (reading) {
return false;
module.exports = callback => {
2017-06-22 16:00:13 +00:00
if (!config.postfixbounce.enabled) {
return setImmediate(callback);
2016-09-22 09:04:30 +00:00
2017-06-22 16:00:13 +00:00
let started = false;
server.on('error', err => {
const port = config.postfixbounce.port;
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
switch (err.code) {
case 'EACCES':
log.error('POSTFIXBOUNCE', '%s requires elevated privileges.', bind);
log.error('POSTFIXBOUNCE', '%s is already in use', bind);
log.error('POSTFIXBOUNCE', err);
if (!started) {
started = true;
return callback(err);
server.listen(config.postfixbounce.port, config.postfixbounce.host, () => {
2017-06-22 16:16:26 +00:00
if (started) {
return server.close();
2017-06-22 16:00:13 +00:00
started = true;
log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port);
2016-09-22 09:04:30 +00:00