Por tof the postfix bounce server. Not tested.
This commit is contained in:
parent
63765f7222
commit
907d548e02
5 changed files with 143 additions and 153 deletions
12
index.js
12
index.js
|
@ -96,12 +96,12 @@ dbcheck(err => { // Check if database needs upgrading before starting the server
|
||||||
feedcheck.spawn(() => {
|
feedcheck.spawn(() => {
|
||||||
senders.spawn(() => {
|
senders.spawn(() => {
|
||||||
//triggers(() => {
|
//triggers(() => {
|
||||||
//postfixBounceServer(async () => {
|
postfixBounceServer(async () => {
|
||||||
(async () => {
|
(async () => {
|
||||||
await reportProcessor.init();
|
await reportProcessor.init();
|
||||||
log.info('Service', 'All services started');
|
log.info('Service', 'All services started');
|
||||||
})();
|
})();
|
||||||
//});
|
});
|
||||||
//});
|
//});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -583,6 +583,17 @@ async function changeStatusByMessage(context, message, subscriptionStatus, updat
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function updateMessageResponse(context, message, response, responseId) {
|
||||||
|
await knex.transaction(async tx => {
|
||||||
|
await shares.enforceEntityPermissionTx(tx, context, 'campaign', message.campaign, 'manageMessages');
|
||||||
|
|
||||||
|
await tx('campaign_messages').where('id', message.id).update({
|
||||||
|
response,
|
||||||
|
response_id: responseId
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async function getSubscribersQueryGeneratorTx(tx, campaignId, onlyUnsent, batchSize) {
|
async function getSubscribersQueryGeneratorTx(tx, campaignId, onlyUnsent, batchSize) {
|
||||||
/*
|
/*
|
||||||
This is supposed to produce queries like this:
|
This is supposed to produce queries like this:
|
||||||
|
@ -728,6 +739,7 @@ module.exports.getMessageByResponseId = getMessageByResponseId;
|
||||||
|
|
||||||
module.exports.changeStatusByCampaignCidAndSubscriptionIdTx = changeStatusByCampaignCidAndSubscriptionIdTx;
|
module.exports.changeStatusByCampaignCidAndSubscriptionIdTx = changeStatusByCampaignCidAndSubscriptionIdTx;
|
||||||
module.exports.changeStatusByMessage = changeStatusByMessage;
|
module.exports.changeStatusByMessage = changeStatusByMessage;
|
||||||
|
module.exports.updateMessageResponse = updateMessageResponse;
|
||||||
|
|
||||||
module.exports.getSubscribersQueryGeneratorTx = getSubscribersQueryGeneratorTx;
|
module.exports.getSubscribersQueryGeneratorTx = getSubscribersQueryGeneratorTx;
|
||||||
|
|
||||||
|
|
|
@ -1,147 +0,0 @@
|
||||||
'use strict';
|
|
||||||
|
|
||||||
// FIXME - port for the new campaigns model
|
|
||||||
|
|
||||||
const log = require('npmlog');
|
|
||||||
const config = require('config');
|
|
||||||
const net = require('net');
|
|
||||||
const campaigns = require('../lib/models/campaigns');
|
|
||||||
|
|
||||||
const seenIds = new Set();
|
|
||||||
|
|
||||||
const server = net.createServer(socket => {
|
|
||||||
let remainder = '';
|
|
||||||
|
|
||||||
let reading = false;
|
|
||||||
let readNextChunk = () => {
|
|
||||||
let chunk = socket.read();
|
|
||||||
if (chunk === null) {
|
|
||||||
reading = false;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
reading = true;
|
|
||||||
|
|
||||||
let lines = (remainder + chunk.toString()).split(/\r?\n/);
|
|
||||||
remainder = lines.pop();
|
|
||||||
|
|
||||||
let pos = 0;
|
|
||||||
let checkNextLine = () => {
|
|
||||||
if (pos >= lines.length) {
|
|
||||||
return readNextChunk();
|
|
||||||
}
|
|
||||||
let line = lines[pos++];
|
|
||||||
let match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/);
|
|
||||||
if (match) {
|
|
||||||
let queueId = match[1];
|
|
||||||
let queued = '';
|
|
||||||
let queued_as = '';
|
|
||||||
|
|
||||||
if (seenIds.has(queueId)) {
|
|
||||||
return checkNextLine();
|
|
||||||
}
|
|
||||||
seenIds.add(queueId);
|
|
||||||
|
|
||||||
// Losacno: Check for local requeue
|
|
||||||
let status = match[2];
|
|
||||||
log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status);
|
|
||||||
if ( status === 'sent' ) {
|
|
||||||
// Save new queueId to update message's previous queueId (thanks @mfechner )
|
|
||||||
queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/);
|
|
||||||
if ( queued ) {
|
|
||||||
queued = queued[1];
|
|
||||||
queued_as = queued.match(/ queued as (\w+)/);
|
|
||||||
if (queued_as) {
|
|
||||||
queued_as = queued_as[1];
|
|
||||||
} else {
|
|
||||||
queued_as = '';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
campaigns.findMailByResponse(queueId, (err, message) => {
|
|
||||||
if (err || !message) {
|
|
||||||
return checkNextLine();
|
|
||||||
}
|
|
||||||
if ( queued_as || status === 'sent' ) {
|
|
||||||
log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queued_as);
|
|
||||||
// Update message's previous queueId (thanks @mfechner )
|
|
||||||
campaigns.updateMessageResponse(message, queued, queued_as, (err, updated) => {
|
|
||||||
if (err) {
|
|
||||||
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
|
|
||||||
} else if (updated) {
|
|
||||||
log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queued_as);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
} else {
|
|
||||||
campaigns.updateMessage(message, 'bounced', true, (err, updated) => {
|
|
||||||
if (err) {
|
|
||||||
log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack);
|
|
||||||
} else if (updated) {
|
|
||||||
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// No need to keep in memory... free it ( thanks @witzig )
|
|
||||||
seenIds.delete(queueId);
|
|
||||||
|
|
||||||
return checkNextLine();
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
return checkNextLine();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
checkNextLine();
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
socket.on('readable', () => {
|
|
||||||
if (reading) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
readNextChunk();
|
|
||||||
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
module.exports = callback => {
|
|
||||||
if (!config.postfixbounce.enabled) {
|
|
||||||
return setImmediate(callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
let started = false;
|
|
||||||
|
|
||||||
server.on('error', err => {
|
|
||||||
const port = config.postfixbounce.port;
|
|
||||||
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
|
|
||||||
|
|
||||||
switch (err.code) {
|
|
||||||
case 'EACCES':
|
|
||||||
log.error('POSTFIXBOUNCE', '%s requires elevated privileges.', bind);
|
|
||||||
break;
|
|
||||||
case 'EADDRINUSE':
|
|
||||||
log.error('POSTFIXBOUNCE', '%s is already in use', bind);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
log.error('POSTFIXBOUNCE', err);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!started) {
|
|
||||||
started = true;
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
server.listen(config.postfixbounce.port, config.postfixbounce.host, () => {
|
|
||||||
if (started) {
|
|
||||||
return server.close();
|
|
||||||
}
|
|
||||||
started = true;
|
|
||||||
log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port);
|
|
||||||
setImmediate(callback);
|
|
||||||
});
|
|
||||||
};
|
|
125
services/postfix-bounce-server.js
Normal file
125
services/postfix-bounce-server.js
Normal file
|
@ -0,0 +1,125 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const log = require('npmlog');
|
||||||
|
const config = require('config');
|
||||||
|
const net = require('net');
|
||||||
|
const campaigns = require('../models/campaigns');
|
||||||
|
const contextHelpers = require('../lib/context-helpers');
|
||||||
|
const { SubscriptionStatus } = require('../shared/lists');
|
||||||
|
|
||||||
|
const seenIds = new Set();
|
||||||
|
|
||||||
|
let remainder = '';
|
||||||
|
let reading = false;
|
||||||
|
|
||||||
|
async function readNextChunks() {
|
||||||
|
if (reading) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
reading = true;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const chunk = socket.read();
|
||||||
|
if (chunk === null) {
|
||||||
|
reading = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const lines = (remainder + chunk.toString()).split(/\r?\n/);
|
||||||
|
remainder = lines.pop();
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
try {
|
||||||
|
const match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/);
|
||||||
|
if (match) {
|
||||||
|
let queueId = match[1];
|
||||||
|
let queued = '';
|
||||||
|
let queuedAs = '';
|
||||||
|
|
||||||
|
if (!seenIds.has(queueId)) {
|
||||||
|
seenIds.add(queueId);
|
||||||
|
|
||||||
|
// Losacno: Check for local requeue
|
||||||
|
let status = match[2];
|
||||||
|
log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status);
|
||||||
|
if (status === 'sent') {
|
||||||
|
// Save new queueId to update message's previous queueId (thanks @mfechner )
|
||||||
|
queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/);
|
||||||
|
if (queued) {
|
||||||
|
queued = queued[1];
|
||||||
|
queuedAs = queued.match(/ queued as (\w+)/);
|
||||||
|
if (queuedAs) {
|
||||||
|
queuedAs = queuedAs[1];
|
||||||
|
} else {
|
||||||
|
queuedAs = '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = await campaigns.getMessageByResponseId(queueId);
|
||||||
|
if (message) {
|
||||||
|
if (queuedAs || status === 'sent') {
|
||||||
|
log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queuedAs);
|
||||||
|
// Update message's previous queueId (thanks @mfechner )
|
||||||
|
campaigns.updateMessageResponse(contextHelpers.getAdminContext(), message, queued, queuedAs);
|
||||||
|
log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queuedAs);
|
||||||
|
} else {
|
||||||
|
campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true);
|
||||||
|
log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// No need to keep in memory... free it ( thanks @witzig )
|
||||||
|
seenIds.delete(queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.error('POSTFIXBOUNCE', err && err.stack);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = callback => {
|
||||||
|
if (!config.postfixbounce.enabled) {
|
||||||
|
return setImmediate(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
let started = false; // Not sure why all this magic around "started". But it was there this way in Mailtrain v1, so we kept it.
|
||||||
|
|
||||||
|
const server = net.createServer(socket => {
|
||||||
|
socket.on('readable', readNextChunks);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.on('error', err => {
|
||||||
|
const port = config.postfixbounce.port;
|
||||||
|
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
|
||||||
|
|
||||||
|
switch (err.code) {
|
||||||
|
case 'EACCES':
|
||||||
|
log.error('POSTFIXBOUNCE', '%s requires elevated privileges.', bind);
|
||||||
|
break;
|
||||||
|
case 'EADDRINUSE':
|
||||||
|
log.error('POSTFIXBOUNCE', '%s is already in use', bind);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
log.error('POSTFIXBOUNCE', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!started) {
|
||||||
|
started = true;
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(config.postfixbounce.port, config.postfixbounce.host, () => {
|
||||||
|
if (started) {
|
||||||
|
return server.close();
|
||||||
|
}
|
||||||
|
started = true;
|
||||||
|
log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port);
|
||||||
|
setImmediate(callback);
|
||||||
|
});
|
||||||
|
};
|
Loading…
Add table
Add a link
Reference in a new issue