WiP on mailers

This commit is contained in:
Tomas Bures 2018-04-29 18:13:40 +02:00
parent e97415c237
commit a4ee1534cc
46 changed files with 1263 additions and 529 deletions

View file

@ -1,5 +1,7 @@
'use strict';
// FIXME - revisit and rewrite if necessary
let log = require('npmlog');
let db = require('../lib/db');

View file

@ -1,5 +1,7 @@
'use strict';
// FIXME - revisit and rewrite if necessary
let log = require('npmlog');
let db = require('../lib/db');

View file

@ -1,13 +1,15 @@
'use strict';
let log = require('npmlog');
let config = require('config');
let net = require('net');
let campaigns = require('../lib/models/campaigns');
// FIXME - port for the new campaigns model
let seenIds = new Set();
const log = require('npmlog');
const config = require('config');
const net = require('net');
const campaigns = require('../lib/models/campaigns');
let server = net.createServer(socket => {
const seenIds = new Set();
const server = net.createServer(socket => {
let remainder = '';
let reading = false;

View file

@ -1,5 +1,7 @@
'use strict';
// FIXME - update/rewrite
const { nodeifyFunction } = require('../lib/nodeify');
const getSettings = nodeifyFunction(require('../models/settings').get);
@ -7,7 +9,7 @@ let log = require('npmlog');
let config = require('config');
let db = require('../lib/db');
let tools = require('../lib/tools');
let mailer = require('../lib/mailer');
let mailer = require('../lib/mailers');
let campaigns = require('../lib/models/campaigns');
let segments = require('../lib/models/segments');
let lists = require('../lib/models/lists');
@ -462,7 +464,7 @@ function formatMessage(message, callback) {
}
let sendLoop = () => {
mailer.getMailer(err => {
mailers.getMailer(err => {
if (err) {
log.error('Mail', err.stack);
return setTimeout(sendLoop, 10 * 1000);
@ -471,14 +473,14 @@ let sendLoop = () => {
let isThrottled = false;
let getNext = () => {
if (!mailer.transport.isIdle() || isThrottled) {
// only retrieve new messages if there are free slots in the mailer queue
if (!mailers.transport.isIdle() || isThrottled) {
// only retrieve new messages if there are free slots in the mailers queue
return;
}
isThrottled = true;
mailer.transport.checkThrottling(() => {
mailers.transport.checkThrottling(() => {
isThrottled = false;
@ -495,7 +497,7 @@ let sendLoop = () => {
}
// log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid);
// format message to nodemailer message format
// format message to nodemailers message format
formatMessage(message, (err, mail) => {
if (err) {
log.error('Mail', err.stack);
@ -516,7 +518,7 @@ let sendLoop = () => {
tryCount++;
// send the message
mailer.transport.sendMail(mail, (err, info) => {
mailers.transport.sendMail(mail, (err, info) => {
if (err) {
log.error('Mail', err.stack);
if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) {
@ -589,7 +591,7 @@ let sendLoop = () => {
});
};
mailer.transport.on('idle', getNext);
mailers.transport.on('idle', getNext);
setImmediate(getNext);
});
};
@ -598,7 +600,7 @@ sendLoop();
process.on('message', m => {
if (m && m.reload) {
log.info('Sender/' + process.pid, 'Reloading mailer config');
mailer.update();
log.info('Sender/' + process.pid, 'Reloading mailers config');
mailers.update();
}
});

View file

@ -1,18 +1,18 @@
'use strict';
let log = require('npmlog');
let config = require('config');
let crypto = require('crypto');
let humanize = require('humanize');
let http = require('http');
const log = require('npmlog');
const config = require('config');
const crypto = require('crypto');
const humanize = require('humanize');
const http = require('http');
let SMTPServer = require('smtp-server').SMTPServer;
let simpleParser = require('mailparser').simpleParser;
const SMTPServer = require('smtp-server').SMTPServer;
const simpleParser = require('mailparser').simpleParser;
let totalMessages = 0;
let received = 0;
let mailstore = {
const mailstore = {
accounts: {},
saveMessage(address, message) {
if (!this.accounts[address]) {
@ -36,7 +36,7 @@ let mailstore = {
};
// Setup server
let server = new SMTPServer({
const server = new SMTPServer({
// log to console
logger: config.testserver.logger,

View file

@ -1,5 +1,7 @@
'use strict';
// FIXME - revisit and rewrite if necessary
let log = require('npmlog');
let db = require('../lib/db');
let tools = require('../lib/tools');

View file

@ -8,58 +8,47 @@
// JOIN with subscription table. Subscription table includes timezone name for
// a subscriber and tzoffset table includes offset from UTC in minutes
let moment = require('moment-timezone');
let db = require('../lib/db');
let log = require('npmlog');
const moment = require('moment-timezone');
const knex = require('../lib/knex');
const log = require('npmlog');
let lastCheck = false;
const timezone_timeout = 60 * 60 * 1000;
function updateTimezoneOffsets(callback) {
async function updateTimezoneOffsets() {
log.verbose('UTC', 'Updating timezone offsets');
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let values = [];
moment.tz.names().forEach(tz => {
let time = moment();
values.push('(' + connection.escape(tz.toLowerCase().trim()) + ',' + connection.escape(time.tz(tz).utcOffset()) + ')');
const values = [];
for (const tz of moment.tz.names()) {
values.push({
tz: tz.toLowerCase().trim(),
offset: moment.tz(tz).utcOffset()
});
}
let query = 'INSERT INTO tzoffset (`tz`, `offset`) VALUES ' + values.join(', ') + ' ON DUPLICATE KEY UPDATE `offset` = VALUES(`offset`)';
connection.query(query, values, (err, result) => {
connection.release();
if (err) {
return callback(err);
}
return callback(null, result);
});
await knex.transaction(async tx => {
await tx('tzoffset').del();
await tx('tzoffset').insert(values);
});
}
module.exports = callback => {
updateTimezoneOffsets(err => {
if (err) {
return callback(err);
}
let checkLoop = () => {
let curUtcDate = new Date().toISOString().split('T').shift();
if (curUtcDate !== lastCheck) {
updateTimezoneOffsets(err => {
if (err) {
log.error('UTC', err);
}
setTimeout(checkLoop, timezone_timeout);
});
} else {
setTimeout(checkLoop, timezone_timeout);
}
lastCheck = curUtcDate;
};
setTimeout(checkLoop, timezone_timeout);
callback(null, true);
});
function start() {
let curUtcDate = new Date().toISOString().split('T').shift();
if (curUtcDate !== lastCheck) {
updateTimezoneOffsets()
.then(() => {
setTimeout(start, timezone_timeout)
})
.catch(err => {
log.error('UTC', err);
setTimeout(start, timezone_timeout);
});
} else {
setTimeout(start, timezone_timeout);
}
lastCheck = curUtcDate;
}
module.exports = {
start
};

View file

@ -1,5 +1,7 @@
'use strict';
// FIXME - port for the new campaigns model
const { nodeifyFunction } = require('../lib/nodeify');
const getSettings = nodeifyFunction(require('../models/settings').get);