From cff908887ffc3ff6d22c5bb49d8b20817b08fd0c Mon Sep 17 00:00:00 2001 From: witzig Date: Thu, 1 Jun 2017 13:08:45 +0200 Subject: [PATCH] Fixed throttling and pausing #243 --- lib/db.js | 15 ++++- lib/mailer.js | 25 +++++-- services/sender.js | 160 ++++++++++++++++++++++++--------------------- 3 files changed, 117 insertions(+), 83 deletions(-) diff --git a/lib/db.js b/lib/db.js index 2f221d44..deec4531 100644 --- a/lib/db.js +++ b/lib/db.js @@ -4,6 +4,7 @@ let config = require('config'); let mysql = require('mysql'); let redis = require('redis'); let Lock = require('redfour'); +let tools = require('./tools'); module.exports = mysql.createPool(config.mysql); if (config.redis && config.redis.enabled) { @@ -33,7 +34,7 @@ if (config.redis && config.redis.enabled) { }; module.exports.clearCache = (key, callback) => { - module.exports.redis.del(key, err => callback(err)); + module.exports.redis.del('mailtrain:cache:' + key, err => callback(err)); }; module.exports.addToCache = (key, value, callback) => { @@ -76,9 +77,21 @@ if (config.redis && config.redis.enabled) { module.exports.clearCache = (key, callback) => { caches.delete(key); + tools.workers.forEach(child => { + child.send({ + cmd: 'db.clearCache', + key + }); + }); setImmediate(() => callback()); }; + process.on('message', m => { + if (m && m.cmd === 'db.clearCache' && m.key) { + caches.delete(m.key); + } + }); + module.exports.addToCache = (key, value, callback) => { if (!caches.has(key)) { caches.set(key, []); diff --git a/lib/mailer.js b/lib/mailer.js index 577508f3..4ef5ad0a 100644 --- a/lib/mailer.js +++ b/lib/mailer.js @@ -182,10 +182,10 @@ function createMailer(callback) { module.exports.transport.checkThrottling = null; } - let throttling = Number(configItems.smtpThrottling) || 0; - if (throttling) { + let sendingRate = Number(configItems.smtpThrottling) || 0; + if (sendingRate) { // convert to messages/second - throttling = 1 / (throttling / (3600 * 1000)); + sendingRate = sendingRate / 3600; } let transportOptions; @@ -236,7 +236,7 @@ function createMailer(callback) { error: logfunc.bind(null, 'error') }, maxConnections: Number(configItems.smtpMaxConnections), - sendingRate: throttling, + sendingRate, tls: { rejectUnauthorized: !configItems.smtpSelfSigned } @@ -257,19 +257,30 @@ function createMailer(callback) { oldListeners.forEach(listener => module.exports.transport.on('idle', listener)); } - let lastCheck = Date.now(); + if (configItems.mailTransport === 'smtp' || !configItems.mailTransport) { + + let throttling = Number(configItems.smtpThrottling) || 0; + if (throttling) { + throttling = 1 / (throttling / (3600 * 1000)); + } + + let lastCheck = Date.now(); + module.exports.transport.checkThrottling = function (next) { if (!throttling) { return next(); } let nextCheck = Date.now(); let checkDiff = (nextCheck - lastCheck); - lastCheck = nextCheck; if (checkDiff < throttling) { log.verbose('Mail', 'Throttling next message in %s sec.', (throttling - checkDiff) / 1000); - setTimeout(next, throttling - checkDiff); + setTimeout(() => { + lastCheck = Date.now(); + next(); + }, throttling - checkDiff); } else { + lastCheck = nextCheck; next(); } }; diff --git a/services/sender.js b/services/sender.js index f04dd488..d81b4315 100644 --- a/services/sender.js +++ b/services/sender.js @@ -466,119 +466,129 @@ let sendLoop = () => { return setTimeout(sendLoop, 10 * 1000); } + let isThrottled = false; + let getNext = () => { - if (!mailer.transport.isIdle()) { + if (!mailer.transport.isIdle() || isThrottled) { // only retrieve new messages if there are free slots in the mailer queue return; } - // find an unsent message - findUnsent((err, message) => { - if (err) { - log.error('Mail', err.stack); - setTimeout(getNext, mailing_timeout); - return; - } - if (!message) { - setTimeout(getNext, mailing_timeout); - return; - } + isThrottled = true; - //log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid); - // format message to nodemailer message format - formatMessage(message, (err, mail) => { + mailer.transport.checkThrottling(() => { + + isThrottled = false; + + // find an unsent message + findUnsent((err, message) => { if (err) { log.error('Mail', err.stack); setTimeout(getNext, mailing_timeout); return; } + if (!message) { + setTimeout(getNext, mailing_timeout); + return; + } - blacklist.isblacklisted(mail.to.address, (err, blacklisted) => { - if (err) { - log.error('Mail', err); - setTimeout(getNext, mailing_timeout); - return; - } - if (!blacklisted) { - let tryCount = 0; - let trySend = () => { - tryCount++; + // log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid); + // format message to nodemailer message format + formatMessage(message, (err, mail) => { + if (err) { + log.error('Mail', err.stack); + setTimeout(getNext, mailing_timeout); + return; + } - // send the message - mailer.transport.sendMail(mail, (err, info) => { - if (err) { - log.error('Mail', err.stack); - if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) { - // temporary error, try again - return setTimeout(trySend, tryCount * 1000); - } - } + blacklist.isblacklisted(mail.to.address, (err, blacklisted) => { + if (err) { + log.error('Mail', err); + setTimeout(getNext, mailing_timeout); + return; + } - let status = err ? 2 : 1; - let response = err && (err.response || err.message) || info.response || info.messageId; - let responseId = response.split(/\s+/).pop(); + if (!blacklisted) { + let tryCount = 0; + let trySend = () => { + tryCount++; + // send the message + mailer.transport.sendMail(mail, (err, info) => { + if (err) { + log.error('Mail', err.stack); + if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) { + // temporary error, try again + return setTimeout(trySend, tryCount * 1000); + } + } + + let status = err ? 2 : 1; + let response = err && (err.response || err.message) || info.response || info.messageId; + let responseId = response.split(/\s+/).pop(); + + db.getConnection((err, connection) => { + if (err) { + log.error('Mail', err.stack); + return; + } + + let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1'; + + connection.query(query, [message.campaignId], err => { + if (err) { + log.error('Mail', err.stack); + } + + let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1'; + + connection.query(query, [status, response, responseId, message.id], err => { + connection.release(); + if (err) { + log.error('Mail', err.stack); + } else { + // log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid); + } + }); + }); + }); + }); + }; + setImmediate(trySend); + } else { db.getConnection((err, connection) => { if (err) { - log.error('Mail', err.stack); + log.error('Mail', err); return; } - let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1'; + let query = 'UPDATE `campaigns` SET `blacklisted`=`blacklisted`+1 WHERE id=? LIMIT 1'; connection.query(query, [message.campaignId], err => { if (err) { - log.error('Mail', err.stack); + log.error('Mail', err); } let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1'; - connection.query(query, [status, response, responseId, message.id], err => { + connection.query(query, [5, 'blacklisted', 'blacklisted', message.id], err => { connection.release(); if (err) { - log.error('Mail', err.stack); - } else { - // log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid); + log.error('Mail', err); } }); }); }); - }); - }; - setImmediate(trySend); - } else { - db.getConnection((err, connection) => { - if (err) { - log.error('Mail', err); - return; - } - - let query = 'UPDATE `campaigns` SET `blacklisted`=`blacklisted`+1 WHERE id=? LIMIT 1'; - - connection.query(query, [message.campaignId], err => { - if (err) { - log.error('Mail', err); - } - - let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1'; - - connection.query(query, [5, 'blacklisted', 'blacklisted', message.id], err => { - connection.release(); - if (err) { - log.error('Mail', err); - } - }); - }); - }); - } - setImmediate(() => mailer.transport.checkThrottling(getNext)); + } + setImmediate(getNext); + }); }); }); }); }; - mailer.transport.on('idle', () => mailer.transport.checkThrottling(getNext)); - setImmediate(() => mailer.transport.checkThrottling(getNext)); + mailer.transport.on('idle', getNext); + setImmediate(getNext); }); };