Fixed throttling and pausing #243

This commit is contained in:
witzig 2017-06-01 13:08:45 +02:00
parent a7b2c33b30
commit cff908887f
3 changed files with 117 additions and 83 deletions

View file

@ -4,6 +4,7 @@ let config = require('config');
let mysql = require('mysql'); let mysql = require('mysql');
let redis = require('redis'); let redis = require('redis');
let Lock = require('redfour'); let Lock = require('redfour');
let tools = require('./tools');
module.exports = mysql.createPool(config.mysql); module.exports = mysql.createPool(config.mysql);
if (config.redis && config.redis.enabled) { if (config.redis && config.redis.enabled) {
@ -33,7 +34,7 @@ if (config.redis && config.redis.enabled) {
}; };
module.exports.clearCache = (key, callback) => { module.exports.clearCache = (key, callback) => {
module.exports.redis.del(key, err => callback(err)); module.exports.redis.del('mailtrain:cache:' + key, err => callback(err));
}; };
module.exports.addToCache = (key, value, callback) => { module.exports.addToCache = (key, value, callback) => {
@ -76,9 +77,21 @@ if (config.redis && config.redis.enabled) {
module.exports.clearCache = (key, callback) => { module.exports.clearCache = (key, callback) => {
caches.delete(key); caches.delete(key);
tools.workers.forEach(child => {
child.send({
cmd: 'db.clearCache',
key
});
});
setImmediate(() => callback()); setImmediate(() => callback());
}; };
process.on('message', m => {
if (m && m.cmd === 'db.clearCache' && m.key) {
caches.delete(m.key);
}
});
module.exports.addToCache = (key, value, callback) => { module.exports.addToCache = (key, value, callback) => {
if (!caches.has(key)) { if (!caches.has(key)) {
caches.set(key, []); caches.set(key, []);

View file

@ -182,10 +182,10 @@ function createMailer(callback) {
module.exports.transport.checkThrottling = null; module.exports.transport.checkThrottling = null;
} }
let throttling = Number(configItems.smtpThrottling) || 0; let sendingRate = Number(configItems.smtpThrottling) || 0;
if (throttling) { if (sendingRate) {
// convert to messages/second // convert to messages/second
throttling = 1 / (throttling / (3600 * 1000)); sendingRate = sendingRate / 3600;
} }
let transportOptions; let transportOptions;
@ -236,7 +236,7 @@ function createMailer(callback) {
error: logfunc.bind(null, 'error') error: logfunc.bind(null, 'error')
}, },
maxConnections: Number(configItems.smtpMaxConnections), maxConnections: Number(configItems.smtpMaxConnections),
sendingRate: throttling, sendingRate,
tls: { tls: {
rejectUnauthorized: !configItems.smtpSelfSigned rejectUnauthorized: !configItems.smtpSelfSigned
} }
@ -257,19 +257,30 @@ function createMailer(callback) {
oldListeners.forEach(listener => module.exports.transport.on('idle', listener)); oldListeners.forEach(listener => module.exports.transport.on('idle', listener));
} }
let lastCheck = Date.now();
if (configItems.mailTransport === 'smtp' || !configItems.mailTransport) { if (configItems.mailTransport === 'smtp' || !configItems.mailTransport) {
let throttling = Number(configItems.smtpThrottling) || 0;
if (throttling) {
throttling = 1 / (throttling / (3600 * 1000));
}
let lastCheck = Date.now();
module.exports.transport.checkThrottling = function (next) { module.exports.transport.checkThrottling = function (next) {
if (!throttling) { if (!throttling) {
return next(); return next();
} }
let nextCheck = Date.now(); let nextCheck = Date.now();
let checkDiff = (nextCheck - lastCheck); let checkDiff = (nextCheck - lastCheck);
lastCheck = nextCheck;
if (checkDiff < throttling) { if (checkDiff < throttling) {
log.verbose('Mail', 'Throttling next message in %s sec.', (throttling - checkDiff) / 1000); log.verbose('Mail', 'Throttling next message in %s sec.', (throttling - checkDiff) / 1000);
setTimeout(next, throttling - checkDiff); setTimeout(() => {
lastCheck = Date.now();
next();
}, throttling - checkDiff);
} else { } else {
lastCheck = nextCheck;
next(); next();
} }
}; };

View file

@ -466,119 +466,129 @@ let sendLoop = () => {
return setTimeout(sendLoop, 10 * 1000); return setTimeout(sendLoop, 10 * 1000);
} }
let isThrottled = false;
let getNext = () => { let getNext = () => {
if (!mailer.transport.isIdle()) { if (!mailer.transport.isIdle() || isThrottled) {
// only retrieve new messages if there are free slots in the mailer queue // only retrieve new messages if there are free slots in the mailer queue
return; return;
} }
// find an unsent message isThrottled = true;
findUnsent((err, message) => {
if (err) {
log.error('Mail', err.stack);
setTimeout(getNext, mailing_timeout);
return;
}
if (!message) {
setTimeout(getNext, mailing_timeout);
return;
}
//log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid); mailer.transport.checkThrottling(() => {
// format message to nodemailer message format
formatMessage(message, (err, mail) => { isThrottled = false;
// find an unsent message
findUnsent((err, message) => {
if (err) { if (err) {
log.error('Mail', err.stack); log.error('Mail', err.stack);
setTimeout(getNext, mailing_timeout); setTimeout(getNext, mailing_timeout);
return; return;
} }
if (!message) {
setTimeout(getNext, mailing_timeout);
return;
}
blacklist.isblacklisted(mail.to.address, (err, blacklisted) => { // log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid);
if (err) { // format message to nodemailer message format
log.error('Mail', err); formatMessage(message, (err, mail) => {
setTimeout(getNext, mailing_timeout); if (err) {
return; log.error('Mail', err.stack);
} setTimeout(getNext, mailing_timeout);
if (!blacklisted) { return;
let tryCount = 0; }
let trySend = () => {
tryCount++;
// send the message blacklist.isblacklisted(mail.to.address, (err, blacklisted) => {
mailer.transport.sendMail(mail, (err, info) => { if (err) {
if (err) { log.error('Mail', err);
log.error('Mail', err.stack); setTimeout(getNext, mailing_timeout);
if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) { return;
// temporary error, try again }
return setTimeout(trySend, tryCount * 1000);
}
}
let status = err ? 2 : 1; if (!blacklisted) {
let response = err && (err.response || err.message) || info.response || info.messageId; let tryCount = 0;
let responseId = response.split(/\s+/).pop(); let trySend = () => {
tryCount++;
// send the message
mailer.transport.sendMail(mail, (err, info) => {
if (err) {
log.error('Mail', err.stack);
if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) {
// temporary error, try again
return setTimeout(trySend, tryCount * 1000);
}
}
let status = err ? 2 : 1;
let response = err && (err.response || err.message) || info.response || info.messageId;
let responseId = response.split(/\s+/).pop();
db.getConnection((err, connection) => {
if (err) {
log.error('Mail', err.stack);
return;
}
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => {
if (err) {
log.error('Mail', err.stack);
}
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [status, response, responseId, message.id], err => {
connection.release();
if (err) {
log.error('Mail', err.stack);
} else {
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
}
});
});
});
});
};
setImmediate(trySend);
} else {
db.getConnection((err, connection) => { db.getConnection((err, connection) => {
if (err) { if (err) {
log.error('Mail', err.stack); log.error('Mail', err);
return; return;
} }
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1'; let query = 'UPDATE `campaigns` SET `blacklisted`=`blacklisted`+1 WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => { connection.query(query, [message.campaignId], err => {
if (err) { if (err) {
log.error('Mail', err.stack); log.error('Mail', err);
} }
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1'; let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [status, response, responseId, message.id], err => { connection.query(query, [5, 'blacklisted', 'blacklisted', message.id], err => {
connection.release(); connection.release();
if (err) { if (err) {
log.error('Mail', err.stack); log.error('Mail', err);
} else {
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
} }
}); });
}); });
}); });
}); }
}; setImmediate(getNext);
setImmediate(trySend); });
} else {
db.getConnection((err, connection) => {
if (err) {
log.error('Mail', err);
return;
}
let query = 'UPDATE `campaigns` SET `blacklisted`=`blacklisted`+1 WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => {
if (err) {
log.error('Mail', err);
}
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [5, 'blacklisted', 'blacklisted', message.id], err => {
connection.release();
if (err) {
log.error('Mail', err);
}
});
});
});
}
setImmediate(() => mailer.transport.checkThrottling(getNext));
}); });
}); });
}); });
}; };
mailer.transport.on('idle', () => mailer.transport.checkThrottling(getNext)); mailer.transport.on('idle', getNext);
setImmediate(() => mailer.transport.checkThrottling(getNext)); setImmediate(getNext);
}); });
}; };