From 907d548e027e926d839285ec1131a9ae92224913 Mon Sep 17 00:00:00 2001 From: Tomas Bures Date: Tue, 18 Sep 2018 11:03:36 +0200 Subject: [PATCH] Por tof the postfix bounce server. Not tested. --- index.js | 12 +- models/campaigns.js | 12 ++ obsolete/services/postfix-bounce-server.js | 147 --------------------- {tasks => obsolete/tasks}/jsxgettext.js | 0 services/postfix-bounce-server.js | 125 ++++++++++++++++++ 5 files changed, 143 insertions(+), 153 deletions(-) delete mode 100644 obsolete/services/postfix-bounce-server.js rename {tasks => obsolete/tasks}/jsxgettext.js (100%) create mode 100644 services/postfix-bounce-server.js diff --git a/index.js b/index.js index 9c3b9017..83fdafb0 100644 --- a/index.js +++ b/index.js @@ -96,12 +96,12 @@ dbcheck(err => { // Check if database needs upgrading before starting the server feedcheck.spawn(() => { senders.spawn(() => { //triggers(() => { - //postfixBounceServer(async () => { - (async () => { - await reportProcessor.init(); - log.info('Service', 'All services started'); - })(); - //}); + postfixBounceServer(async () => { + (async () => { + await reportProcessor.init(); + log.info('Service', 'All services started'); + })(); + }); //}); }); }); diff --git a/models/campaigns.js b/models/campaigns.js index 6a980a43..7a1da65f 100644 --- a/models/campaigns.js +++ b/models/campaigns.js @@ -583,6 +583,17 @@ async function changeStatusByMessage(context, message, subscriptionStatus, updat }); } +async function updateMessageResponse(context, message, response, responseId) { + await knex.transaction(async tx => { + await shares.enforceEntityPermissionTx(tx, context, 'campaign', message.campaign, 'manageMessages'); + + await tx('campaign_messages').where('id', message.id).update({ + response, + response_id: responseId + }); + }); +} + async function getSubscribersQueryGeneratorTx(tx, campaignId, onlyUnsent, batchSize) { /* This is supposed to produce queries like this: @@ -728,6 +739,7 @@ module.exports.getMessageByResponseId = getMessageByResponseId; module.exports.changeStatusByCampaignCidAndSubscriptionIdTx = changeStatusByCampaignCidAndSubscriptionIdTx; module.exports.changeStatusByMessage = changeStatusByMessage; +module.exports.updateMessageResponse = updateMessageResponse; module.exports.getSubscribersQueryGeneratorTx = getSubscribersQueryGeneratorTx; diff --git a/obsolete/services/postfix-bounce-server.js b/obsolete/services/postfix-bounce-server.js deleted file mode 100644 index 340287b6..00000000 --- a/obsolete/services/postfix-bounce-server.js +++ /dev/null @@ -1,147 +0,0 @@ -'use strict'; - -// FIXME - port for the new campaigns model - -const log = require('npmlog'); -const config = require('config'); -const net = require('net'); -const campaigns = require('../lib/models/campaigns'); - -const seenIds = new Set(); - -const server = net.createServer(socket => { - let remainder = ''; - - let reading = false; - let readNextChunk = () => { - let chunk = socket.read(); - if (chunk === null) { - reading = false; - return; - } - reading = true; - - let lines = (remainder + chunk.toString()).split(/\r?\n/); - remainder = lines.pop(); - - let pos = 0; - let checkNextLine = () => { - if (pos >= lines.length) { - return readNextChunk(); - } - let line = lines[pos++]; - let match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/); - if (match) { - let queueId = match[1]; - let queued = ''; - let queued_as = ''; - - if (seenIds.has(queueId)) { - return checkNextLine(); - } - seenIds.add(queueId); - - // Losacno: Check for local requeue - let status = match[2]; - log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status); - if ( status === 'sent' ) { - // Save new queueId to update message's previous queueId (thanks @mfechner ) - queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/); - if ( queued ) { - queued = queued[1]; - queued_as = queued.match(/ queued as (\w+)/); - if (queued_as) { - queued_as = queued_as[1]; - } else { - queued_as = ''; - } - } - } - - campaigns.findMailByResponse(queueId, (err, message) => { - if (err || !message) { - return checkNextLine(); - } - if ( queued_as || status === 'sent' ) { - log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queued_as); - // Update message's previous queueId (thanks @mfechner ) - campaigns.updateMessageResponse(message, queued, queued_as, (err, updated) => { - if (err) { - log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack); - } else if (updated) { - log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queued_as); - } - }); - - } else { - campaigns.updateMessage(message, 'bounced', true, (err, updated) => { - if (err) { - log.error('POSTFIXBOUNCE', 'Failed updating message: %s', err && err.stack); - } else if (updated) { - log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId); - } - }); - } - - // No need to keep in memory... free it ( thanks @witzig ) - seenIds.delete(queueId); - - return checkNextLine(); - }); - return; - - } else { - return checkNextLine(); - } - }; - - checkNextLine(); - }; - - - socket.on('readable', () => { - if (reading) { - return false; - } - readNextChunk(); - - }); -}); - -module.exports = callback => { - if (!config.postfixbounce.enabled) { - return setImmediate(callback); - } - - let started = false; - - server.on('error', err => { - const port = config.postfixbounce.port; - const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port; - - switch (err.code) { - case 'EACCES': - log.error('POSTFIXBOUNCE', '%s requires elevated privileges.', bind); - break; - case 'EADDRINUSE': - log.error('POSTFIXBOUNCE', '%s is already in use', bind); - break; - default: - log.error('POSTFIXBOUNCE', err); - } - - if (!started) { - started = true; - return callback(err); - } - }); - - server.listen(config.postfixbounce.port, config.postfixbounce.host, () => { - if (started) { - return server.close(); - } - started = true; - log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port); - setImmediate(callback); - }); -}; diff --git a/tasks/jsxgettext.js b/obsolete/tasks/jsxgettext.js similarity index 100% rename from tasks/jsxgettext.js rename to obsolete/tasks/jsxgettext.js diff --git a/services/postfix-bounce-server.js b/services/postfix-bounce-server.js new file mode 100644 index 00000000..07f916a4 --- /dev/null +++ b/services/postfix-bounce-server.js @@ -0,0 +1,125 @@ +'use strict'; + +const log = require('npmlog'); +const config = require('config'); +const net = require('net'); +const campaigns = require('../models/campaigns'); +const contextHelpers = require('../lib/context-helpers'); +const { SubscriptionStatus } = require('../shared/lists'); + +const seenIds = new Set(); + +let remainder = ''; +let reading = false; + +async function readNextChunks() { + if (reading) { + return false; + } + + reading = true; + + while (true) { + const chunk = socket.read(); + if (chunk === null) { + reading = false; + return; + } + + const lines = (remainder + chunk.toString()).split(/\r?\n/); + remainder = lines.pop(); + + for (const line of lines) { + try { + const match = /\bstatus=(bounced|sent)\b/.test(line) && line.match(/\bpostfix\/\w+\[\d+\]:\s*([^:]+).*?status=(\w+)/); + if (match) { + let queueId = match[1]; + let queued = ''; + let queuedAs = ''; + + if (!seenIds.has(queueId)) { + seenIds.add(queueId); + + // Losacno: Check for local requeue + let status = match[2]; + log.verbose('POSTFIXBOUNCE', 'Checking message %s for local requeue (status: %s)', queueId, status); + if (status === 'sent') { + // Save new queueId to update message's previous queueId (thanks @mfechner ) + queued = / relay=/.test(line) && line.match(/status=sent \((.*)\)/); + if (queued) { + queued = queued[1]; + queuedAs = queued.match(/ queued as (\w+)/); + if (queuedAs) { + queuedAs = queuedAs[1]; + } else { + queuedAs = ''; + } + } + } + + const message = await campaigns.getMessageByResponseId(queueId); + if (message) { + if (queuedAs || status === 'sent') { + log.verbose('POSTFIXBOUNCE', 'Message %s locally requeued as %s', queueId, queuedAs); + // Update message's previous queueId (thanks @mfechner ) + campaigns.updateMessageResponse(contextHelpers.getAdminContext(), message, queued, queuedAs); + log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queuedAs); + } else { + campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true); + log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId); + } + + // No need to keep in memory... free it ( thanks @witzig ) + seenIds.delete(queueId); + } + } + } + } catch (err) { + log.error('POSTFIXBOUNCE', err && err.stack); + } + } + + } +} + +module.exports = callback => { + if (!config.postfixbounce.enabled) { + return setImmediate(callback); + } + + let started = false; // Not sure why all this magic around "started". But it was there this way in Mailtrain v1, so we kept it. + + const server = net.createServer(socket => { + socket.on('readable', readNextChunks); + }); + + server.on('error', err => { + const port = config.postfixbounce.port; + const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port; + + switch (err.code) { + case 'EACCES': + log.error('POSTFIXBOUNCE', '%s requires elevated privileges.', bind); + break; + case 'EADDRINUSE': + log.error('POSTFIXBOUNCE', '%s is already in use', bind); + break; + default: + log.error('POSTFIXBOUNCE', err); + } + + if (!started) { + started = true; + return callback(err); + } + }); + + server.listen(config.postfixbounce.port, config.postfixbounce.host, () => { + if (started) { + return server.close(); + } + started = true; + log.info('POSTFIXBOUNCE', 'Server listening on port %s', config.postfixbounce.port); + setImmediate(callback); + }); +};