Added option to spawn multiple sender processes

This commit is contained in:
Andris Reinman 2016-12-11 00:38:54 +02:00
parent 88fe24a709
commit 8ca1fbb535
12 changed files with 262 additions and 135 deletions

View file

@ -15,7 +15,6 @@ let shortid = require('shortid');
let url = require('url');
let htmlToText = require('html-to-text');
let request = require('request');
let caches = require('../lib/caches');
let libmime = require('libmime');
let attachmentCache = new Map();
@ -105,94 +104,140 @@ function findUnsent(callback) {
});
};
if (caches.cache.has('sender queue')) {
let cached = caches.shift('sender queue');
return returnUnsent(cached.row, cached.campaign);
}
db.getConnection((err, connection) => {
db.getFromCache('sender', (err, cached) => {
if (err) {
return callback(err);
}
if (cached) {
return returnUnsent(cached.row, cached.campaign);
}
// Find "normal" campaigns. Ignore RSS and drip campaigns at this point
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1';
connection.query(query, [2, 1, 3], (err, rows) => {
connection.release();
db.getLock('queue', (err, lock) => {
if (err) {
return callback(err);
}
if (!rows || !rows.length) {
return checkQueued();
if (!lock) {
return setTimeout(() => findUnsent(callback), 10 * 1000);
}
let campaign = tools.convertKeys(rows[0]);
let getSegmentQuery = (segmentId, next) => {
segmentId = Number(segmentId);
if (!segmentId) {
return next(null, {
where: '',
values: []
});
}
segments.getQuery(segmentId, 'subscription', next);
};
getSegmentQuery(campaign.segment, (err, queryData) => {
// try again to fetch a key from cache, maybe there was some other instance that held the lock
db.getFromCache('sender', (err, cached) => {
if (err) {
return callback(err);
}
if (cached) {
return lock.release(() => {
returnUnsent(cached.row, cached.campaign);
});
}
let done = function () {
lock.release(() => {
callback(...arguments);
});
};
db.getConnection((err, connection) => {
if (err) {
return callback(err);
return done(err);
}
// TODO: Add support for localized sending time. In this case campaign messages are
// not sent before receiver's local time reaches defined time
// SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime
let query;
let values;
// NOT IN
query = 'SELECT * FROM `subscription__' + campaign.list + '` AS subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 150';
values = queryData.values.concat([campaign.list, campaign.segment]);
// LEFT JOIN / IS NULL
//query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 150';
//values = [campaign.list, campaign.segment].concat(queryData.values);
connection.query(query, values, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
// everything already processed for this campaign
connection.query('UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? LIMIT 1', [campaign.id], () => {
connection.release();
return callback(null, false);
});
return;
}
// Find "normal" campaigns. Ignore RSS and drip campaigns at this point
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1';
connection.query(query, [2, 1, 3], (err, rows) => {
connection.release();
if (err) {
return done(err);
}
if (!rows || !rows.length) {
return checkQueued();
}
rows.forEach(row => {
caches.push('sender queue', {
row,
campaign
let campaign = tools.convertKeys(rows[0]);
let getSegmentQuery = (segmentId, next) => {
segmentId = Number(segmentId);
if (!segmentId) {
return next(null, {
where: '',
values: []
});
}
segments.getQuery(segmentId, 'subscription', next);
};
getSegmentQuery(campaign.segment, (err, queryData) => {
if (err) {
return done(err);
}
db.getConnection((err, connection) => {
if (err) {
return done(err);
}
// TODO: Add support for localized sending time. In this case campaign messages are
// not sent before receiver's local time reaches defined time
// SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime
let query;
let values;
// NOT IN
query = 'SELECT * FROM `subscription__' + campaign.list + '` AS subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 1000';
values = queryData.values.concat([campaign.list, campaign.segment]);
// LEFT JOIN / IS NULL
//query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 150';
//values = [campaign.list, campaign.segment].concat(queryData.values);
connection.query(query, values, (err, rows) => {
if (err) {
connection.release();
return done(err);
}
if (!rows || !rows.length) {
// everything already processed for this campaign
connection.query('UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? AND `status`=? LIMIT 1', [campaign.id, 2], () => {
connection.release();
return done(null, false);
});
return;
}
connection.release();
let pos = 0;
let addToCache = () => {
if (pos >= rows.length) {
lock.release(() => {
findUnsent(callback);
});
return;
}
let row = rows[pos++];
db.addToCache('sender', {
row,
campaign
}, err => {
if (err) {
return done(err);
}
setImmediate(addToCache);
});
};
addToCache();
});
});
});
return findUnsent(callback);
});
});
});
});
});
});
@ -494,6 +539,7 @@ let sendLoop = () => {
};
mailer.transport.on('idle', () => mailer.transport.checkThrottling(getNext));
setImmediate(() => mailer.transport.checkThrottling(getNext));
});
};