From 8ca1fbb535b20ea377fb597ec16ed97196719ad7 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Sun, 11 Dec 2016 00:38:54 +0200 Subject: [PATCH] Added option to spawn multiple sender processes --- CHANGELOG.txt | 4 + README.md | 8 +- config/default.toml | 2 +- index.js | 5 ++ lib/caches.js | 28 ------- lib/db.js | 94 +++++++++++++++++++++ lib/mailer.js | 8 +- lib/models/campaigns.js | 32 ++++---- package.json | 4 +- services/sender.js | 176 +++++++++++++++++++++++++--------------- setup/install.sh | 32 ++++---- views/settings.hbs | 4 +- 12 files changed, 262 insertions(+), 135 deletions(-) delete mode 100644 lib/caches.js diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 0949bb67..85cf5fbb 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,5 +1,9 @@ # Changelog +## 1.20.0 2016-12-11 + + * Added option to distribute sending queue between multiple processes to speed up delivery + ## 1.19.0 2016-09-15 * Changed license from GPL-V3 to MIT diff --git a/README.md b/README.md index 2f3e8aa0..6fe87a7c 100644 --- a/README.md +++ b/README.md @@ -24,16 +24,14 @@ Check out [ZoneMTA](https://github.com/zone-eu/zone-mta) as an alternative self ## Cons - * Alpha-grade software. Might or might not work as expected - * Awful code base, needs refactoring - * No tests + * Beta-grade software. Might or might not work as expected. There are several users with list sizes between 100k and 1M and Mailtrain seems to work for them but YMMV * Almost no documentation (there are some guides in the [Wiki](https://github.com/andris9/mailtrain/wiki)) ## Requirements - * Nodejs v5+ + * Nodejs v6+ * MySQL v5.5 or MariaDB - * Redis (optional, disabled by default, used only for session storage) + * Redis. Optional, disabled by default. Used for session storage and for caching state between multiple processes. If you do not have Redis enabled then you can only use a single sender process ## Installation diff --git a/config/default.toml b/config/default.toml index c736bda3..29e1c69e 100644 --- a/config/default.toml +++ b/config/default.toml @@ -112,5 +112,5 @@ host="127.0.0.1" [queue] # How many parallel sender processes to spawn -# Do not use more than 1 for now as it would create race conditions +# You can use more than 1 process only if you have Redis enabled processes=1 diff --git a/index.js b/index.js index 42a3ab22..0fd4854e 100644 --- a/index.js +++ b/index.js @@ -72,6 +72,11 @@ function spawnSenders(callback) { let spawned = 0; let returned = false; + if (processes > 1 && !config.redis.enabled) { + log.error('Queue', '%s processes requested but Redis is not enabled, spawning 1 process', processes); + processes = 1; + } + let spawnSender = function () { if (spawned >= processes) { if (!returned) { diff --git a/lib/caches.js b/lib/caches.js deleted file mode 100644 index b7c308ba..00000000 --- a/lib/caches.js +++ /dev/null @@ -1,28 +0,0 @@ -'use strict'; - -let cache = module.exports.cache = new Map(); - -module.exports.push = (name, value) => { - if (!cache.has(name)) { - cache.set(name, []); - } else if (!Array.isArray(cache.get(name))) { - cache.set(name, [].concat(cache.get(name) || [])); - } - cache.get(name).push(value); -}; - -module.exports.shift = name => { - if (!cache.has(name)) { - return false; - } - if (!Array.isArray(cache.get(name))) { - let value = cache.get(name); - cache.delete(name); - return value; - } - let value = cache.get(name).shift(); - if (!cache.get(name).length) { - cache.delete(name); - } - return value; -}; diff --git a/lib/db.js b/lib/db.js index 2c3a7b21..095608ae 100644 --- a/lib/db.js +++ b/lib/db.js @@ -2,5 +2,99 @@ let config = require('config'); let mysql = require('mysql'); +let redis = require('redis'); +let Lock = require('redfour'); module.exports = mysql.createPool(config.mysql); +if (config.redis.enabled) { + + module.exports.redis = redis.createClient(config.redis); + + let queueLock = new Lock({ + redis: config.redis, + namespace: 'mailtrain:lock' + }); + + module.exports.getLock = (id, callback) => { + queueLock.waitAcquireLock(id, 60 * 1000 /* Lock expires after 60sec */ , 10 * 1000 /* Wait for lock for up to 10sec */ , (err, lock) => { + if (err) { + return callback(err); + } + if (!lock) { + return callback(null, false); + } + return callback(null, { + lock, + release(done) { + queueLock.releaseLock(lock, done); + } + }); + }); + }; + + module.exports.clearCache = (key, callback) => { + module.exports.redis.del(key, err => callback(err)); + }; + + module.exports.addToCache = (key, value, callback) => { + if (!value) { + return setImmediate(() => callback()); + } + module.exports.redis.multi(). + lpush('mailtrain:cache:' + key, JSON.stringify(value)). + expire('mailtrain:cache:' + key, 24 * 3600). + exec(err => callback(err)); + }; + + module.exports.getFromCache = (key, callback) => { + module.exports.redis.rpop('mailtrain:cache:' + key, (err, value) => { + if (err) { + return callback(err); + } + try { + value = JSON.parse(value); + } catch (E) { + return callback(E); + } + + return callback(null, value); + }); + }; + +} else { + // fakelock. does not lock anything + module.exports.getLock = (id, callback) => { + setImmediate(() => callback(null, { + lock: false, + release(done) { + setImmediate(done); + } + })); + }; + + let caches = new Map(); + + module.exports.clearCache = (key, callback) => { + caches.delete(key); + setImmediate(() => callback()); + }; + + module.exports.addToCache = (key, value, callback) => { + if (!caches.has(key)) { + caches.set(key, []); + } + caches.get(key).push(value); + setImmediate(() => callback()); + }; + + module.exports.getFromCache = (key, callback) => { + let value; + if (caches.has(key)) { + value = caches.get(key).shift(); + if (!caches.get(key).length) { + caches.delete(key); + } + } + setImmediate(() => callback(null, value)); + }; +} diff --git a/lib/mailer.js b/lib/mailer.js index 0decf469..c50aee66 100644 --- a/lib/mailer.js +++ b/lib/mailer.js @@ -6,7 +6,7 @@ let nodemailer = require('nodemailer'); let openpgpEncrypt = require('nodemailer-openpgp').openpgpEncrypt; let settings = require('./models/settings'); let tools = require('./tools'); -let caches = require('./caches'); +let db = require('./db'); let Handlebars = require('handlebars'); let fs = require('fs'); let path = require('path'); @@ -156,6 +156,7 @@ function createMailer(callback) { rejectUnauthorized: !configItems.smtpSelfSigned } }, config.nodemailer); + module.exports.transport.use('stream', openpgpEncrypt({ signingKey: configItems.pgpPrivateKey, passphrase: configItems.pgpPassphrase @@ -187,8 +188,9 @@ function createMailer(callback) { } }; - caches.cache.delete('sender queue'); - return callback(null, module.exports.transport); + db.clearCache('sender', () => { + callback(null, module.exports.transport); + }); }); } diff --git a/lib/models/campaigns.js b/lib/models/campaigns.js index 78114c59..6e545f1b 100644 --- a/lib/models/campaigns.js +++ b/lib/models/campaigns.js @@ -11,7 +11,6 @@ let isUrl = require('is-url'); let feed = require('../feed'); let log = require('npmlog'); let mailer = require('../mailer'); -let caches = require('../caches'); let humanize = require('humanize'); let allowedKeys = ['description', 'from', 'address', 'reply_to', 'subject', 'template', 'source_url', 'list', 'segment', 'html', 'text', 'tracking_disabled']; @@ -894,8 +893,9 @@ module.exports.delete = (id, callback) => { return callback(err); } - caches.cache.delete('sender queue'); - return callback(null, affected); + db.clearCache('sender', () => { + callback(null, affected); + }); }); }); }); @@ -959,8 +959,9 @@ module.exports.pause = (id, callback) => { if (err) { return callback(err); } - caches.cache.delete('sender queue'); - return callback(null, true); + db.clearCache('sender', () => { + callback(null, true); + }); }); }); }); @@ -987,23 +988,24 @@ module.exports.reset = (id, callback) => { return callback(err); } - caches.cache.delete('sender queue'); - connection.query('UPDATE links SET `clicks`=0 WHERE campaign=?', [id], err => { - if (err) { - connection.release(); - return callback(err); - } - connection.query('TRUNCATE TABLE `campaign__' + id + '`', [id], err => { + db.clearCache('sender', () => { + connection.query('UPDATE links SET `clicks`=0 WHERE campaign=?', [id], err => { if (err) { connection.release(); return callback(err); } - connection.query('TRUNCATE TABLE `campaign_tracker__' + id + '`', [id], err => { - connection.release(); + connection.query('TRUNCATE TABLE `campaign__' + id + '`', [id], err => { if (err) { + connection.release(); return callback(err); } - return callback(null, true); + connection.query('TRUNCATE TABLE `campaign_tracker__' + id + '`', [id], err => { + connection.release(); + if (err) { + return callback(err); + } + return callback(null, true); + }); }); }); }); diff --git a/package.json b/package.json index 141931f2..0af0dcf8 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "mailtrain", "private": true, - "version": "1.19.1", + "version": "1.20.0", "description": "Self hosted email newsletter app", "main": "index.js", "scripts": { @@ -67,6 +67,8 @@ "openpgp": "^2.3.5", "passport": "^0.3.2", "passport-local": "^1.0.0", + "redfour": "^1.0.0", + "redis": "^2.6.3", "request": "^2.79.0", "serve-favicon": "^2.3.2", "shortid": "^2.2.6", diff --git a/services/sender.js b/services/sender.js index d2735c32..105e531d 100644 --- a/services/sender.js +++ b/services/sender.js @@ -15,7 +15,6 @@ let shortid = require('shortid'); let url = require('url'); let htmlToText = require('html-to-text'); let request = require('request'); -let caches = require('../lib/caches'); let libmime = require('libmime'); let attachmentCache = new Map(); @@ -105,94 +104,140 @@ function findUnsent(callback) { }); }; - if (caches.cache.has('sender queue')) { - let cached = caches.shift('sender queue'); - return returnUnsent(cached.row, cached.campaign); - } - db.getConnection((err, connection) => { + db.getFromCache('sender', (err, cached) => { if (err) { return callback(err); } + if (cached) { + return returnUnsent(cached.row, cached.campaign); + } - // Find "normal" campaigns. Ignore RSS and drip campaigns at this point - let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1'; - connection.query(query, [2, 1, 3], (err, rows) => { - connection.release(); + db.getLock('queue', (err, lock) => { if (err) { return callback(err); } - if (!rows || !rows.length) { - return checkQueued(); + + if (!lock) { + return setTimeout(() => findUnsent(callback), 10 * 1000); } - let campaign = tools.convertKeys(rows[0]); - - let getSegmentQuery = (segmentId, next) => { - segmentId = Number(segmentId); - if (!segmentId) { - return next(null, { - where: '', - values: [] - }); - } - - segments.getQuery(segmentId, 'subscription', next); - }; - - getSegmentQuery(campaign.segment, (err, queryData) => { + // try again to fetch a key from cache, maybe there was some other instance that held the lock + db.getFromCache('sender', (err, cached) => { if (err) { return callback(err); } + if (cached) { + return lock.release(() => { + returnUnsent(cached.row, cached.campaign); + }); + } + + let done = function () { + lock.release(() => { + callback(...arguments); + }); + }; db.getConnection((err, connection) => { if (err) { - return callback(err); + return done(err); } - // TODO: Add support for localized sending time. In this case campaign messages are - // not sent before receiver's local time reaches defined time - // SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime - - let query; - let values; - - // NOT IN - query = 'SELECT * FROM `subscription__' + campaign.list + '` AS subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 150'; - values = queryData.values.concat([campaign.list, campaign.segment]); - - // LEFT JOIN / IS NULL - //query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 150'; - //values = [campaign.list, campaign.segment].concat(queryData.values); - - connection.query(query, values, (err, rows) => { - - if (err) { - connection.release(); - return callback(err); - } - - if (!rows || !rows.length) { - // everything already processed for this campaign - connection.query('UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? LIMIT 1', [campaign.id], () => { - connection.release(); - return callback(null, false); - }); - return; - } + // Find "normal" campaigns. Ignore RSS and drip campaigns at this point + let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1'; + connection.query(query, [2, 1, 3], (err, rows) => { connection.release(); + if (err) { + return done(err); + } + if (!rows || !rows.length) { + return checkQueued(); + } - rows.forEach(row => { - caches.push('sender queue', { - row, - campaign + let campaign = tools.convertKeys(rows[0]); + + let getSegmentQuery = (segmentId, next) => { + segmentId = Number(segmentId); + if (!segmentId) { + return next(null, { + where: '', + values: [] + }); + } + + segments.getQuery(segmentId, 'subscription', next); + }; + + getSegmentQuery(campaign.segment, (err, queryData) => { + if (err) { + return done(err); + } + + db.getConnection((err, connection) => { + if (err) { + return done(err); + } + + // TODO: Add support for localized sending time. In this case campaign messages are + // not sent before receiver's local time reaches defined time + // SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime + + let query; + let values; + + // NOT IN + query = 'SELECT * FROM `subscription__' + campaign.list + '` AS subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 1000'; + values = queryData.values.concat([campaign.list, campaign.segment]); + + // LEFT JOIN / IS NULL + //query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 150'; + //values = [campaign.list, campaign.segment].concat(queryData.values); + + connection.query(query, values, (err, rows) => { + + if (err) { + connection.release(); + return done(err); + } + + if (!rows || !rows.length) { + // everything already processed for this campaign + connection.query('UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? AND `status`=? LIMIT 1', [campaign.id, 2], () => { + connection.release(); + return done(null, false); + }); + return; + } + connection.release(); + + let pos = 0; + let addToCache = () => { + if (pos >= rows.length) { + lock.release(() => { + findUnsent(callback); + }); + return; + } + let row = rows[pos++]; + db.addToCache('sender', { + row, + campaign + }, err => { + if (err) { + return done(err); + } + setImmediate(addToCache); + }); + }; + + addToCache(); + }); }); - }); - return findUnsent(callback); + }); }); }); - }); }); }); @@ -494,6 +539,7 @@ let sendLoop = () => { }; mailer.transport.on('idle', () => mailer.transport.checkThrottling(getNext)); + setImmediate(() => mailer.transport.checkThrottling(getNext)); }); }; diff --git a/setup/install.sh b/setup/install.sh index dd2433b5..16d1a6bf 100755 --- a/setup/install.sh +++ b/setup/install.sh @@ -116,7 +116,7 @@ fi mkdir -p /opt/zone-mta cd /opt/zone-mta git clone git://github.com/zone-eu/zone-mta.git . -git checkout 1c07b2c6 +git checkout 6964091273 # Ensure queue folder mkdir -p /var/data/zone-mta/mailtrain @@ -124,6 +124,7 @@ mkdir -p /var/data/zone-mta/mailtrain # Setup installation configuration cat >> config/production.json <> config/production.json <Max connections
- The count of maximum simultaneous connections to make against the SMTP server (defaults to 5) + The count of maximum simultaneous connections to make against the SMTP server (defaults to 5). This limit is per sending process.
@@ -234,7 +234,7 @@
- Maximum number of messages to send in an hour. Leave empty or zero for no throttling. If your provider uses a different speed limit (messages/minute or messages/second) then convert this limit into messages/hour (1m/s => 3600m/h). + Maximum number of messages to send in an hour. Leave empty or zero for no throttling. If your provider uses a different speed limit (messages/minute or messages/second) then convert this limit into messages/hour (1m/s => 3600m/h). This limit is per sending process.