Fetch multiple unsent messages at once to speed up delivery

This commit is contained in:
Andris Reinman 2016-05-25 18:01:39 +03:00
parent 9a5d723663
commit f29a8a1b67
9 changed files with 184 additions and 77 deletions

View file

@ -16,7 +16,44 @@ let url = require('url');
let htmlToText = require('html-to-text');
let request = require('request');
// to speed things up fetch several unsent messages and store these into a cache
let fetchCache = [];
function findUnsent(callback) {
let returnUnsent = (row, campaign) => {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let subscription = tools.convertKeys(row);
let query = 'INSERT INTO `campaign__' + campaign.id + '` (list, segment, subscription) VALUES(?, ?,?)';
connection.query(query, [campaign.list, campaign.segment, subscription.id], (err, result) => {
connection.release();
if (err) {
if (err.code === 'ER_DUP_ENTRY') {
// race condition, try next one
return findUnsent(callback);
}
return callback(err);
}
subscription.campaign = campaign.id;
callback(null, {
id: result.insertId,
listId: campaign.list,
campaignId: campaign.id,
subscription
});
});
});
};
if (fetchCache.length) {
let cached = fetchCache.shift();
return returnUnsent(cached.row, cached.campaign);
}
db.getConnection((err, connection) => {
if (err) {
return callback(err);
@ -24,13 +61,13 @@ function findUnsent(callback) {
// Find "normal" campaigns. Ignore RSS and drip campaigns at this point
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1';
connection.query(query, [2, 1, 3], (err, rows) => {
connection.release();
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
@ -44,7 +81,7 @@ function findUnsent(callback) {
values: []
});
}
segments.getQuery(segmentId, next);
segments.getQuery(segmentId, 'subscription', next);
};
getSegmentQuery(campaign.segment, (err, queryData) => {
@ -52,15 +89,27 @@ function findUnsent(callback) {
return callback(err);
}
let tryNext = () => {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
// TODO: Add support for localized sending time. In this case campaign messages are
// not sent before receiver's local time reaches defined time
// SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime
let query = 'SELECT * FROM `subscription__' + campaign.list + '` subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND NOT EXISTS (SELECT 1 FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 1';
let query;
let values;
connection.query(query, queryData.values.concat([campaign.list, campaign.segment]), (err, rows) => {
// NOT IN
query = 'SELECT * FROM `subscription__' + campaign.list + '` AS subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 100';
values = queryData.values.concat([campaign.list, campaign.segment]);
// LEFT JOIN / IS NULL
//query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 100';
//values = [campaign.list, campaign.segment].concat(queryData.values);
connection.query(query, values, (err, rows) => {
if (err) {
connection.release();
return callback(err);
@ -73,30 +122,19 @@ function findUnsent(callback) {
return callback(null, false);
});
}
connection.release();
let subscription = tools.convertKeys(rows[0]);
let query = 'INSERT INTO `campaign__' + campaign.id + '` (list, segment, subscription) VALUES(?, ?,?)';
connection.query(query, [campaign.list, campaign.segment, subscription.id], (err, result) => {
if (err) {
if (err.code === 'ER_DUP_ENTRY') {
// race condition, try next one
return tryNext();
}
connection.release();
return callback(err);
}
connection.release();
subscription.campaign = campaign.id;
callback(null, {
id: result.insertId,
listId: campaign.list,
campaignId: campaign.id,
subscription
rows.forEach(row => {
fetchCache.push({
row,
campaign
});
});
return findUnsent(callback);
});
};
tryNext();
});
});
});
});
@ -294,41 +332,52 @@ let sendLoop = () => {
return;
}
// send the message
mailer.transport.sendMail(mail, (err, info) => {
if (err) {
log.error('Mail', err.stack);
}
let tryCount = 0;
let trySend = () => {
tryCount++;
let status = err ? 2 : 1;
let response = err && (err.response || err.message) || info.response;
let responseId = response.split(/\s+/).pop();
db.getConnection((err, connection) => {
// send the message
mailer.transport.sendMail(mail, (err, info) => {
if (err) {
log.error('Mail', err.stack);
return;
if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) {
// temporary error, try again
return setTimeout(trySend, tryCount * 1000);
}
}
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => {
let status = err ? 2 : 1;
let response = err && (err.response || err.message) || info.response;
let responseId = response.split(/\s+/).pop();
db.getConnection((err, connection) => {
if (err) {
log.error('Mail', err.stack);
return;
}
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [status, response, responseId, message.id], err => {
connection.release();
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => {
if (err) {
log.error('Mail', err.stack);
} else {
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
}
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [status, response, responseId, message.id], err => {
connection.release();
if (err) {
log.error('Mail', err.stack);
} else {
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
}
});
});
});
});
});
};
setImmediate(trySend);
setImmediate(getNext);
});
});