Work in progress on refactoring all mail sending to use the message sender an sender workers. No yet finished.

This commit is contained in:
Tomas Bures 2019-06-29 23:19:56 +02:00
parent 355e03900a
commit 4e9f6bd57b
22 changed files with 811 additions and 444 deletions

View file

@ -323,11 +323,11 @@ class SendControls extends Component {
</Form> </Form>
<ButtonRow className={campaignsStyles.sendButtonRow}> <ButtonRow className={campaignsStyles.sendButtonRow}>
{this.getFormValue('sendLater') ? {this.getFormValue('sendLater') ?
<Button className="btn-primary" icon="send" label={(entity.scheduled ? t('rescheduleSend') : t('scheduleSend')) + subscrInfo} onClickAsync={::this.scheduleAsync}/> <Button className="btn-primary" icon="play" label={(entity.scheduled ? t('rescheduleSend') : t('scheduleSend')) + subscrInfo} onClickAsync={::this.scheduleAsync}/>
: :
<Button className="btn-primary" icon="send" label={t('send') + subscrInfo} onClickAsync={::this.confirmStart}/> <Button className="btn-primary" icon="play" label={t('send') + subscrInfo} onClickAsync={::this.confirmStart}/>
} }
{entity.status === CampaignStatus.PAUSED && <Button className="btn-primary" icon="refresh" label={t('reset')} onClickAsync={::this.resetAsync}/>} {entity.status === CampaignStatus.PAUSED && <Button className="btn-primary" icon="redo" label={t('reset')} onClickAsync={::this.resetAsync}/>}
{entity.status === CampaignStatus.PAUSED && <LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/>} {entity.status === CampaignStatus.PAUSED && <LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/>}
{testButtons} {testButtons}
</ButtonRow> </ButtonRow>
@ -341,6 +341,7 @@ class SendControls extends Component {
{t('Campaign is being paused. Please wait.')} {t('Campaign is being paused. Please wait.')}
</AlignedRow> </AlignedRow>
<ButtonRow> <ButtonRow>
<Button className="btn-primary" icon="pause" label={t('Pausing')} disabled={true}/>
<LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/> <LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/>
{testButtons} {testButtons}
</ButtonRow> </ButtonRow>
@ -354,7 +355,7 @@ class SendControls extends Component {
{t('campaignIsBeingSentOut')} {t('campaignIsBeingSentOut')}
</AlignedRow> </AlignedRow>
<ButtonRow> <ButtonRow>
<Button className="btn-primary" icon="stop" label={t('stop')} onClickAsync={::this.stopAsync}/> <Button className="btn-primary" icon="pause" label={t('Pause')} onClickAsync={::this.stopAsync}/>
<LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/> <LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/>
{testButtons} {testButtons}
</ButtonRow> </ButtonRow>
@ -371,7 +372,7 @@ class SendControls extends Component {
</AlignedRow> </AlignedRow>
<ButtonRow> <ButtonRow>
<Button className="btn-primary" icon="play" label={t('continue') + subscrInfo} onClickAsync={::this.confirmStart}/> <Button className="btn-primary" icon="play" label={t('continue') + subscrInfo} onClickAsync={::this.confirmStart}/>
<Button className="btn-primary" icon="refresh" label={t('reset')} onClickAsync={::this.resetAsync}/> <Button className="btn-primary" icon="redo" label={t('reset')} onClickAsync={::this.resetAsync}/>
<LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/> <LinkButton className="btn-secondary" icon="signal" label={t('viewStatistics')} to={`/campaigns/${entity.id}/statistics`}/>
{testButtons} {testButtons}
</ButtonRow> </ButtonRow>

View file

@ -165,6 +165,21 @@ queue:
# How many parallel sender processes to spawn # How many parallel sender processes to spawn
processes: 2 processes: 2
# For how long (in seconds) to try to send an email before Mailtrain stops to trying. An email can normally
# be sent out almost immediately. However if the send configuration is not correct or the mail server is not reachable,
# Mailtrain will keep retrying until the email expires.
# Due to Mailtrain's internal timeouts, the values should be at least 60 seconds.
retention:
# Regular and RSS campaign. Once this expires, the campaign is considered finished. The remaining recipients
# are included in the set of those recipients to whom the message would be delivered if the campaign is again started.
campaign: 86400 # 1 day
# Triggered campaign. Once this expires, the message gets discarded.
triggered: 86400 # 1 day
# Test send (in campaign or template)
test: 300 # 5 minutes
# Subscription and password reset related emails
subscription: 300 # 5 minutes
cors: cors:
# Allow subscription widgets to be embedded # Allow subscription widgets to be embedded
# origins: ['https://www.example.com'] # origins: ['https://www.example.com']

View file

@ -4,17 +4,31 @@ const config = require('config');
const fork = require('./fork').fork; const fork = require('./fork').fork;
const log = require('./log'); const log = require('./log');
const path = require('path'); const path = require('path');
const fs = require('fs-extra') const fs = require('fs-extra');
const crypto = require('crypto'); const crypto = require('crypto');
const bluebird = require('bluebird'); const bluebird = require('bluebird');
let zoneMtaProcess; let zoneMtaProcess = null;
const zoneMtaDir = path.join(__dirname, '..', '..', 'zone-mta'); const zoneMtaDir = path.join(__dirname, '..', '..', 'zone-mta');
const zoneMtaBuiltingConfig = path.join(zoneMtaDir, 'config', 'builtin-zonemta.json'); const zoneMtaBuiltingConfig = path.join(zoneMtaDir, 'config', 'builtin-zonemta.json');
const password = process.env.BUILTIN_ZONE_MTA_PASSWORD || crypto.randomBytes(20).toString('hex').toLowerCase(); const password = process.env.BUILTIN_ZONE_MTA_PASSWORD || crypto.randomBytes(20).toString('hex').toLowerCase();
let restartCount = 0;
let lastRestartCount = 0;
let restartBackoffIdx = 0;
const restartBackoff = [0, 30, 60, 300]; // in seconds
setInterval(() => {
if (restartCount === lastRestartCount) {
restartBackoffIdx = 0;
}
lastRestartCount = restartCount;
}, 300000 /* 5 mins */);
function getUsername() { function getUsername() {
return 'mailtrain'; return 'mailtrain';
} }
@ -119,11 +133,14 @@ async function createConfig() {
await fs.writeFile(zoneMtaBuiltingConfig, JSON.stringify(cnf, null, 2)); await fs.writeFile(zoneMtaBuiltingConfig, JSON.stringify(cnf, null, 2));
} }
function spawn(callback) { function restart(callback) {
if (config.builtinZoneMTA.enabled) { if (zoneMtaProcess) return callback();
createConfig().then(() => { if (restartCount === 0) {
log.info('ZoneMTA', 'Starting built-in Zone MTA process'); log.info('ZoneMTA', 'Starting built-in Zone MTA process');
} else {
log.info('ZoneMTA', `Restarting built-in Zone MTA process (restart count ${restartCount})`);
}
zoneMtaProcess = fork( zoneMtaProcess = fork(
path.join(zoneMtaDir, 'index.js'), path.join(zoneMtaDir, 'index.js'),
@ -138,17 +155,36 @@ function spawn(callback) {
if (msg) { if (msg) {
if (msg.type === 'zone-mta-started') { if (msg.type === 'zone-mta-started') {
log.info('ZoneMTA', 'ZoneMTA process started'); log.info('ZoneMTA', 'ZoneMTA process started');
if (callback) {
return callback(); return callback();
} else if (msg.type === 'entries-added') { } else {
senders.scheduleCheck(); return;
}
} }
} }
}); });
zoneMtaProcess.on('close', (code, signal) => { zoneMtaProcess.on('close', (code, signal) => {
log.error('ZoneMTA', 'ZoneMTA process exited with code %s signal %s', code, signal); log.error('ZoneMTA', 'ZoneMTA process exited with code %s signal %s', code, signal);
});
zoneMtaProcess = null;
restartCount += 1;
const backoffTimeout = restartBackoff[restartBackoffIdx] * 1000;
if (restartBackoffIdx < restartBackoff.length - 1) {
restartBackoffIdx += 1;
}
setTimeout(restart, backoffTimeout);
});
}
function spawn(callback) {
if (config.builtinZoneMTA.enabled) {
createConfig().then(() => {
restart(callback);
}).catch(err => callback(err)); }).catch(err => callback(err));
} else { } else {

View file

@ -23,6 +23,12 @@ const knex = require('knex')({
//, debug: true //, debug: true
}); });
/*
This is to enable logging on mysql side:
SET GLOBAL general_log = 'ON';
SET GLOBAL general_log_file = '/tmp/mysql-all.log';
*/
module.exports = knex; module.exports = knex;

View file

@ -3,8 +3,6 @@
const log = require('./log'); const log = require('./log');
const config = require('config'); const config = require('config');
const Handlebars = require('handlebars');
const util = require('util');
const nodemailer = require('nodemailer'); const nodemailer = require('nodemailer');
const aws = require('aws-sdk'); const aws = require('aws-sdk');
const openpgpEncrypt = require('nodemailer-openpgp').openpgpEncrypt; const openpgpEncrypt = require('nodemailer-openpgp').openpgpEncrypt;
@ -14,13 +12,21 @@ const builtinZoneMta = require('./builtin-zone-mta');
const contextHelpers = require('./context-helpers'); const contextHelpers = require('./context-helpers');
const settings = require('../models/settings'); const settings = require('../models/settings');
const tools = require('./tools');
const htmlToText = require('html-to-text');
const bluebird = require('bluebird'); const bluebird = require('bluebird');
const transports = new Map(); const transports = new Map();
class SendConfigurationError extends Error {
constructor(sendConfigurationId, ...args) {
super(...args);
this.sendConfigurationId = sendConfigurationId;
Error.captureStackTrace(this, SendConfigurationError);
}
}
async function getOrCreateMailer(sendConfigurationId) { async function getOrCreateMailer(sendConfigurationId) {
let sendConfiguration; let sendConfiguration;
@ -73,25 +79,18 @@ function _addDkimKeys(transport, mail) {
async function _sendMail(transport, mail, template) { async function _sendMail(transport, mail, template) {
_addDkimKeys(transport, mail); _addDkimKeys(transport, mail);
let tryCount = 0; try {
const trySend = (callback) => { return await transport.sendMailAsync(mail);
tryCount++;
transport.sendMail(mail, (err, info) => {
if (err) {
log.error('Mail', err);
if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) {
// temporary error, try again
log.verbose('Mail', 'Retrying after %s sec. ...', tryCount);
return setTimeout(trySend, tryCount * 1000);
}
return callback(err);
}
return callback(null, info);
});
};
const trySendAsync = bluebird.promisify(trySend); } catch (err) {
return await trySendAsync(); if ( (err.responseCode && err.responseCode >= 400 && err.responseCode < 500) ||
(err.code === 'ECONNECTION' && err.errno === 'ECONNREFUSED')
) {
throw new SendConfigurationError(transport.mailer.sendConfiguration.id, 'Cannot connect to service specified by send configuration ' + transport.mailer.sendConfiguration.id);
}
throw err;
}
} }
async function _sendTransactionalMail(transport, mail) { async function _sendTransactionalMail(transport, mail) {
@ -103,39 +102,6 @@ async function _sendTransactionalMail(transport, mail) {
return await _sendMail(transport, mail); return await _sendMail(transport, mail);
} }
async function _sendTransactionalMailBasedOnTemplate(transport, mail, template) {
const sendConfiguration = transport.mailer.sendConfiguration;
mail.from = {
name: sendConfiguration.from_name,
address: sendConfiguration.from_email
};
const htmlRenderer = await tools.getTemplate(template.html, template.locale);
if (htmlRenderer) {
mail.html = htmlRenderer(template.data || {});
}
const preparedHtml = await tools.prepareHtml(mail.html);
if (preparedHtml) {
mail.html = preparedHtml;
}
const textRenderer = await tools.getTemplate(template.text, template.locale);
if (textRenderer) {
mail.text = textRenderer(template.data || {});
} else if (mail.html) {
mail.text = htmlToText.fromString(mail.html, {
wordwrap: 130
});
}
return await _sendTransactionalMail(transport, mail);
}
async function _createTransport(sendConfiguration) { async function _createTransport(sendConfiguration) {
const mailerSettings = sendConfiguration.mailer_settings; const mailerSettings = sendConfiguration.mailer_settings;
const mailerType = sendConfiguration.mailer_type; const mailerType = sendConfiguration.mailer_type;
@ -222,6 +188,7 @@ async function _createTransport(sendConfiguration) {
} }
const transport = nodemailer.createTransport(transportOptions, config.nodemailer); const transport = nodemailer.createTransport(transportOptions, config.nodemailer);
transport.sendMailAsync = bluebird.promisify(transport.sendMail.bind(transport));
transport.use('stream', openpgpEncrypt({ transport.use('stream', openpgpEncrypt({
signingKey: configItems.pgpPrivateKey, signingKey: configItems.pgpPrivateKey,
@ -267,8 +234,7 @@ async function _createTransport(sendConfiguration) {
transport.mailer = { transport.mailer = {
sendConfiguration, sendConfiguration,
throttleWait: bluebird.promisify(throttleWait), throttleWait: bluebird.promisify(throttleWait),
sendTransactionalMail: async (mail, template) => await _sendTransactionalMail(transport, mail), sendTransactionalMail: async (mail) => await _sendTransactionalMail(transport, mail),
sendTransactionalMailBasedOnTemplate: async (mail, template) => await _sendTransactionalMailBasedOnTemplate(transport, mail, template),
sendMassMail: async (mail, template) => await _sendMail(transport, mail) sendMassMail: async (mail, template) => await _sendMail(transport, mail)
}; };
@ -286,3 +252,4 @@ class MailerError extends Error {
module.exports.getOrCreateMailer = getOrCreateMailer; module.exports.getOrCreateMailer = getOrCreateMailer;
module.exports.invalidateMailer = invalidateMailer; module.exports.invalidateMailer = invalidateMailer;
module.exports.MailerError = MailerError; module.exports.MailerError = MailerError;
module.exports.SendConfigurationError = SendConfigurationError;

View file

@ -15,9 +15,9 @@ const links = require('../models/links');
const {CampaignSource, CampaignType} = require('../../shared/campaigns'); const {CampaignSource, CampaignType} = require('../../shared/campaigns');
const {SubscriptionStatus} = require('../../shared/lists'); const {SubscriptionStatus} = require('../../shared/lists');
const tools = require('./tools'); const tools = require('./tools');
const htmlToText = require('html-to-text');
const request = require('request-promise'); const request = require('request-promise');
const files = require('../models/files'); const files = require('../models/files');
const htmlToText = require('html-to-text');
const {getPublicUrl} = require('./urls'); const {getPublicUrl} = require('./urls');
const blacklist = require('../models/blacklist'); const blacklist = require('../models/blacklist');
const libmime = require('libmime'); const libmime = require('libmime');
@ -26,10 +26,11 @@ const { enforce } = require('./helpers');
const MessageType = { const MessageType = {
REGULAR: 0, REGULAR: 0,
TRIGGERED: 1, TRIGGERED: 1,
TEST: 2 TEST: 2,
SUBSCRIPTION: 3
}; };
class CampaignSender { class MessageSender {
constructor() { constructor() {
} }
@ -40,6 +41,8 @@ class CampaignSender {
- sendConfiguration, listId, attachments, html, text, subject - sendConfiguration, listId, attachments, html, text, subject
*/ */
async _init(settings) { async _init(settings) {
this.type = settings.type;
this.listsById = new Map(); // listId -> list this.listsById = new Map(); // listId -> list
this.listsByCid = new Map(); // listCid -> list this.listsByCid = new Map(); // listCid -> list
this.listsFieldsGrouped = new Map(); // listId -> fieldsGrouped this.listsFieldsGrouped = new Map(); // listId -> fieldsGrouped
@ -47,11 +50,13 @@ class CampaignSender {
await knex.transaction(async tx => { await knex.transaction(async tx => {
if (settings.campaignCid) { if (settings.campaignCid) {
this.campaign = await campaigns.rawGetByTx(tx, 'cid', settings.campaignCid); this.campaign = await campaigns.rawGetByTx(tx, 'cid', settings.campaignCid);
this.isMassMail = true;
} else if (settings.campaignId) { } else if (settings.campaignId) {
this.campaign = await campaigns.rawGetByTx(tx, 'id', settings.campaignId); this.campaign = await campaigns.rawGetByTx(tx, 'id', settings.campaignId);
this.isMassMail = true;
} else { } else if (this.type === MessageType.TEST) {
// We are not within scope of a campaign (i.e. templates in MessageType.TEST message) // We are not within scope of a campaign (i.e. templates in MessageType.TEST message)
// This is to fake the campaign for getMessageLinks, which is called inside formatMessage // This is to fake the campaign for getMessageLinks, which is called inside formatMessage
this.campaign = { this.campaign = {
@ -60,11 +65,15 @@ class CampaignSender {
from_email_override: null, from_email_override: null,
reply_to_override: null reply_to_override: null
}; };
this.isMassMail = true;
} else {
this.isMassMail = false;
} }
if (settings.sendConfigurationId) { if (settings.sendConfigurationId) {
this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), settings.sendConfigurationId, false, true); this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), settings.sendConfigurationId, false, true);
} else if (this.campaign.send_configuration) { } else if (this.campaign && this.campaign.send_configuration) {
this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.send_configuration, false, true); this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.send_configuration, false, true);
} else { } else {
enforce(false); enforce(false);
@ -79,7 +88,7 @@ class CampaignSender {
this.listsByCid.set(list.cid, list); this.listsByCid.set(list.cid, list);
this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id));
} else if (this.campaign.lists) { } else if (this.campaign && this.campaign.lists) {
for (const listSpec of this.campaign.lists) { for (const listSpec of this.campaign.lists) {
const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), listSpec.list); const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), listSpec.list);
this.listsById.set(list.id, list); this.listsById.set(list.id, list);
@ -94,7 +103,7 @@ class CampaignSender {
if (settings.attachments) { if (settings.attachments) {
this.attachments = settings.attachments; this.attachments = settings.attachments;
} else if (this.campaign.id) { } else if (this.campaign && this.campaign.id) {
const attachments = await files.listTx(tx, contextHelpers.getAdminContext(), 'campaign', 'attachment', this.campaign.id); const attachments = await files.listTx(tx, contextHelpers.getAdminContext(), 'campaign', 'attachment', this.campaign.id);
this.attachments = []; this.attachments = [];
@ -109,16 +118,21 @@ class CampaignSender {
this.attachments = []; this.attachments = [];
} }
if (settings.html !== undefined) { if (settings.renderedHtml !== undefined) {
this.rendereHtml = settings.rendereHtml;
this.renderedText = settings.renderedText;
} else if (settings.html !== undefined) {
this.html = settings.html; this.html = settings.html;
this.text = settings.text; this.text = settings.text;
} else if (this.campaign.source === CampaignSource.TEMPLATE) {
} else if (this.campaign && this.campaign.source === CampaignSource.TEMPLATE) {
this.template = await templates.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.data.sourceTemplate, false); this.template = await templates.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.data.sourceTemplate, false);
} }
if (settings.subject !== undefined) { if (settings.subject !== undefined) {
this.subject = settings.subject; this.subject = settings.subject;
} else if (this.campaign.subject !== undefined) { } else if (this.campaign && this.campaign.subject !== undefined) {
this.subject = this.campaign.subject; this.subject = this.campaign.subject;
} else { } else {
enforce(false); enforce(false);
@ -142,7 +156,7 @@ class CampaignSender {
text = this.text; text = this.text;
renderTags = true; renderTags = true;
} else { } else if (campaign) {
if (campaign.source === CampaignSource.URL) { if (campaign.source === CampaignSource.URL) {
const form = tools.getMessageLinks(campaign, list, subscriptionGrouped); const form = tools.getMessageLinks(campaign, list, subscriptionGrouped);
for (const key in mergeTags) { for (const key in mergeTags) {
@ -222,14 +236,16 @@ class CampaignSender {
} }
async initByCampaignCid(campaignCid) { async initByCampaignCid(campaignCid) {
await this._init({campaignCid}); await this._init({type: MessageType.REGULAR, campaignCid});
} }
async initByCampaignId(campaignId) { async initByCampaignId(campaignId) {
await this._init({campaignId}); await this._init({type: MessageType.REGULAR, campaignId});
} }
async getMessage(listCid, subscriptionCid) { async getMessage(listCid, subscriptionCid) {
enforce(this.type === MessageType.REGULAR);
const list = this.listsByCid.get(listCid); const list = this.listsByCid.get(listCid);
const subscriptionGrouped = await subscriptions.getByCid(contextHelpers.getAdminContext(), list.id, subscriptionCid); const subscriptionGrouped = await subscriptions.getByCid(contextHelpers.getAdminContext(), list.id, subscriptionCid);
const flds = this.listsFieldsGrouped.get(list.id); const flds = this.listsFieldsGrouped.get(list.id);
@ -242,52 +258,52 @@ class CampaignSender {
/* /*
subData is one of: subData is one of:
- queuedMessage - subscriptionId, listId, attachments
or or
- email, listId - email, listId
or
- to, subject
*/ */
async _sendMessage(subData) { async _sendMessage(subData) {
let msgType; let msgType = this.type;
let subscriptionGrouped; let to, email;
let envelope = false;
let sender = false;
let headers = {};
let listHeader = false;
let encryptionKeys = [];
let subject;
let message;
let subscriptionGrouped, list; // May be undefined
const campaign = this.campaign; // May be undefined
if (subData.listId) {
let listId; let listId;
subscriptionGrouped;
if (subData.queuedMessage) { if (subData.subscriptionId) {
const queuedMessage = subData.queuedMessage; listId = subData.listId;
msgType = queuedMessage.type; subscriptionGrouped = await subscriptions.getById(contextHelpers.getAdminContext(), listId, subData.subscriptionId);
listId = queuedMessage.list;
subscriptionGrouped = await subscriptions.getById(contextHelpers.getAdminContext(), listId, queuedMessage.subscription);
} else { } else if (subData.email) {
enforce(subData.email);
enforce(subData.listId);
msgType = MessageType.REGULAR;
listId = subData.listId; listId = subData.listId;
subscriptionGrouped = await subscriptions.getByEmail(contextHelpers.getAdminContext(), listId, subData.email); subscriptionGrouped = await subscriptions.getByEmail(contextHelpers.getAdminContext(), listId, subData.email);
} }
const email = subscriptionGrouped.email; list = this.listsById.get(listId);
email = subscriptionGrouped.email;
if (await blacklist.isBlacklisted(email)) {
return;
}
const list = this.listsById.get(listId);
const flds = this.listsFieldsGrouped.get(list.id); const flds = this.listsFieldsGrouped.get(list.id);
const campaign = this.campaign;
const mergeTags = fields.getMergeTags(flds, subscriptionGrouped, this._getExtraTags(campaign)); const mergeTags = fields.getMergeTags(flds, subscriptionGrouped, this._getExtraTags(campaign));
const encryptionKeys = [];
for (const fld of flds) { for (const fld of flds) {
if (fld.type === 'gpg' && mergeTags[fld.key]) { if (fld.type === 'gpg' && mergeTags[fld.key]) {
encryptionKeys.push(mergeTags[fld.key].trim()); encryptionKeys.push(mergeTags[fld.key].trim());
} }
} }
const sendConfiguration = this.sendConfiguration; message = await this._getMessage(list, subscriptionGrouped, mergeTags, true);
const {html, text, attachments} = await this._getMessage(list, subscriptionGrouped, mergeTags, true);
const campaignAddress = [campaign.cid, list.cid, subscriptionGrouped.cid].join('.'); const campaignAddress = [campaign.cid, list.cid, subscriptionGrouped.cid].join('.');
@ -298,37 +314,25 @@ class CampaignSender {
: getPublicUrl('/subscription/' + list.cid + '/unsubscribe/' + subscriptionGrouped.cid); : getPublicUrl('/subscription/' + list.cid + '/unsubscribe/' + subscriptionGrouped.cid);
} }
const mailer = await mailers.getOrCreateMailer(sendConfiguration.id); to = {
await mailer.throttleWait();
const getOverridable = key => {
if (sendConfiguration[key + '_overridable'] && this.campaign[key + '_override'] !== null) {
return campaign[key + '_override'] || '';
} else {
return sendConfiguration[key] || '';
}
};
const mail = {
from: {
name: getOverridable('from_name'),
address: getOverridable('from_email')
},
replyTo: getOverridable('reply_to'),
xMailer: sendConfiguration.x_mailer ? sendConfiguration.x_mailer : false,
to: {
name: list.to_name === null ? undefined : tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, list.to_name, false), name: list.to_name === null ? undefined : tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, list.to_name, false),
address: subscriptionGrouped.email address: subscriptionGrouped.email
}, };
sender: this.useVerpSenderHeader ? campaignAddress + '@' + sendConfiguration.verp_hostname : false,
envelope: this.useVerp ? { subject = tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, this.subject, false);
if (this.useVerp) {
envelope = {
from: campaignAddress + '@' + sendConfiguration.verp_hostname, from: campaignAddress + '@' + sendConfiguration.verp_hostname,
to: subscriptionGrouped.email to: subscriptionGrouped.email
} : false, };
}
headers: { if (this.useVerpSenderHeader) {
sender = campaignAddress + '@' + sendConfiguration.verp_hostname;
}
headers = {
'x-fbl': campaignAddress, 'x-fbl': campaignAddress,
// custom header for SparkPost // custom header for SparkPost
'x-msys-api': JSON.stringify({ 'x-msys-api': JSON.stringify({
@ -348,27 +352,63 @@ class CampaignSender {
prepared: true, prepared: true,
value: libmime.encodeWords(list.name) + ' <' + list.cid + '.' + getPublicUrl() + '>' value: libmime.encodeWords(list.name) + ' <' + list.cid + '.' + getPublicUrl() + '>'
} }
}, };
list: {
unsubscribe: listUnsubscribe
},
subject: tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, this.subject, false),
html,
text,
attachments, listHeader = {
unsubscribe: listUnsubscribe
};
} else if (subData.to) {
to = subData.to;
email = to.address;
subject = this.subject;
encryptionKeys = subData.encryptionKeys;
message = await this._getMessage();
}
if (await blacklist.isBlacklisted(email)) {
return;
}
const sendConfiguration = this.sendConfiguration;
const mailer = await mailers.getOrCreateMailer(sendConfiguration.id);
await mailer.throttleWait();
const getOverridable = key => {
if (campaign && sendConfiguration[key + '_overridable'] && this.campaign[key + '_override'] !== null) {
return campaign[key + '_override'] || '';
} else {
return sendConfiguration[key] || '';
}
};
const mail = {
from: {
name: getOverridable('from_name'),
address: getOverridable('from_email')
},
replyTo: getOverridable('reply_to'),
xMailer: sendConfiguration.x_mailer ? sendConfiguration.x_mailer : false,
to,
sender,
envelope,
headers,
list: listHeader,
subject,
html: message.html,
text: message.text,
attachments: message.attachments || [],
encryptionKeys encryptionKeys
}; };
let status;
let response; let response;
let responseId = null; let responseId = null;
try {
const info = await mailer.sendMassMail(mail);
status = SubscriptionStatus.SUBSCRIBED;
log.verbose('CampaignSender', `response: ${info.response} messageId: ${info.messageId}`); const info = this.isMassMail ? await mailer.sendMassMail(mail) : await mailer.sendTransactionalMail(mail);
log.verbose('MessageSender', `response: ${info.response} messageId: ${info.messageId}`);
let match; let match;
if ((match = info.response.match(/^250 Message queued as ([0-9a-f]+)$/))) { if ((match = info.response.match(/^250 Message queued as ([0-9a-f]+)$/))) {
@ -410,113 +450,139 @@ class CampaignSender {
if (msgType === MessageType.REGULAR || msgType === MessageType.TRIGGERED) { if (msgType === MessageType.REGULAR || msgType === MessageType.TRIGGERED) {
await knex('campaigns').where('id', campaign.id).increment('delivered'); await knex('campaigns').where('id', campaign.id).increment('delivered');
} }
} catch (err) {
console.log(err);
/*
{ Error: connect ECONNREFUSED 127.0.0.1:55871
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)
cause:
{ Error: connect ECONNREFUSED 127.0.0.1:55871
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)
stack:
'Error: connect ECONNREFUSED 127.0.0.1:55871\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)',
errno: 'ECONNREFUSED',
code: 'ECONNECTION',
syscall: 'connect',
address: '127.0.0.1',
port: 55871,
command: 'CONN' },
isOperational: true,
errno: 'ECONNREFUSED',
code: 'ECONNECTION',
syscall: 'connect',
address: '127.0.0.1',
port: 55871,
command: 'CONN' }
*/
status = SubscriptionStatus.BOUNCED;
response = err.response || err.message;
if (msgType === MessageType.REGULAR || msgType === MessageType.TRIGGERED) {
await knex('campaigns').where('id', campaign.id).increment('delivered').increment('bounced');
}
}
const now = new Date(); const now = new Date();
if (msgType === MessageType.REGULAR) { if (msgType === MessageType.REGULAR) {
enforce(list);
enforce(subscriptionGrouped);
await knex('campaign_messages').insert({ await knex('campaign_messages').insert({
campaign: this.campaign.id, campaign: this.campaign.id,
list: list.id, list: list.id,
subscription: subscriptionGrouped.id, subscription: subscriptionGrouped.id,
send_configuration: sendConfiguration.id, send_configuration: sendConfiguration.id,
status, status: SubscriptionStatus.SUBSCRIBED,
response, response,
response_id: responseId, response_id: responseId,
updated: now updated: now
}); });
} else if (msgType === MessageType.TRIGGERED || msgType === MessageType.TEST) { } else if (msgType === MessageType.TRIGGERED || msgType === MessageType.TEST || msgType === MessageType.SUBSCRIPTION) {
if (subData.queuedMessage.data.attachments) { if (subData.attachments) {
for (const attachment of subData.queuedMessage.data.attachments) { for (const attachment of subData.attachments) {
try { try {
// We ignore any errors here because we already sent the message. Thus we have to mark it as completed to avoid sending it again. // We ignore any errors here because we already sent the message. Thus we have to mark it as completed to avoid sending it again.
await knex.transaction(async tx => { await knex.transaction(async tx => {
await files.unlockTx(tx, 'campaign', 'attachment', attachment.id); await files.unlockTx(tx, 'campaign', 'attachment', attachment.id);
}); });
} catch (err) { } catch (err) {
log.error('CampaignSender', `Error when unlocking attachment ${attachment.id} for ${listId}:${subscriptionGrouped.email} (queuedId: ${subData.queuedMessage.id})`); log.error('MessageSender', `Error when unlocking attachment ${attachment.id} for ${email} (queuedId: ${subData.queuedId})`);
log.verbose(err.stack); log.verbose(err.stack);
} }
} }
} }
await knex('queued') await knex('queued')
.where({id: subData.queuedMessage.id}) .where({id: subData.queuedId})
.del(); .del();
} }
} }
async sendRegularMessage(listId, email) { async sendRegularMessage(listId, email) {
enforce(this.type === MessageType.REGULAR);
await this._sendMessage({listId, email}); await this._sendMessage({listId, email});
} }
} }
CampaignSender.sendQueuedMessage = async (queuedMessage) => { async function sendQueuedMessage(queuedMessage) {
const msgData = queuedMessage.data; const msgData = queuedMessage.data;
const cs = new CampaignSender(); const cs = new MessageSender();
await cs._init({ await cs._init({
type: queuedMessage.type,
campaignId: msgData.campaignId, campaignId: msgData.campaignId,
listId: queuedMessage.list, listId: msgData.listId,
sendConfigurationId: queuedMessage.send_configuration, sendConfigurationId: queuedMessage.send_configuration,
attachments: msgData.attachments, attachments: msgData.attachments,
html: msgData.html, html: msgData.html,
text: msgData.text, text: msgData.text,
subject: msgData.subject subject: msgData.subject,
renderedHtml: msgData.renderedHtml,
renderedText: msgData.renderedText
}); });
await cs._sendMessage({queuedMessage}); await cs._sendMessage({
}; subscriptionId: msgData.subscriptionId,
listId: msgData.listId,
to: msgData.to,
attachments: msgData.attachments,
encryptionKeys: msgData.encryptionKeys,
queuedId: queuedMessage.id
});
}
CampaignSender.queueMessageTx = async (tx, sendConfigurationId, listId, subscriptionId, messageType, messageData) => { async function queueCampaignMessageTx(tx, sendConfigurationId, listId, subscriptionId, messageType, messageData) {
if (messageData.attachments) { enforce(messageType === MessageType.TRIGGERED || messageType === MessageType.TEST);
const msgData = {...messageData};
if (msgData.attachments) {
for (const attachment of messageData.attachments) { for (const attachment of messageData.attachments) {
await files.lockTx(tx,'campaign', 'attachment', attachment.id); await files.lockTx(tx,'campaign', 'attachment', attachment.id);
} }
} }
msgData.listId = listId;
msgData.subscriptionId = subscriptionId;
await tx('queued').insert({ await tx('queued').insert({
send_configuration: sendConfigurationId, send_configuration: sendConfigurationId,
list: listId,
subscription: subscriptionId,
type: messageType, type: messageType,
data: JSON.stringify(messageData) data: JSON.stringify(msgData)
}); });
}
async function queueSubscriptionMessage(sendConfigurationId, to, subject, encryptionKeys, template) {
let html, text;
const htmlRenderer = await tools.getTemplate(template.html, template.locale);
if (htmlRenderer) {
html = htmlRenderer(template.data || {});
if (html) {
html = await tools.prepareHtml(html);
}
}
const textRenderer = await tools.getTemplate(template.text, template.locale);
if (textRenderer) {
text = textRenderer(template.data || {});
} else if (html) {
text = htmlToText.fromString(html, {
wordwrap: 130
});
}
const msgData = {
renderedHtml: html,
renderedText: text,
to,
subject,
encryptionKeys
}; };
module.exports.CampaignSender = CampaignSender; await tx('queued').insert({
send_configuration: sendConfigurationId,
type: MessageType.SUBSCRIPTION,
data: JSON.stringify(msgData)
});
}
module.exports.MessageSender = MessageSender;
module.exports.MessageType = MessageType; module.exports.MessageType = MessageType;
module.exports.sendQueuedMessage = sendQueuedMessage;
module.exports.queueCampaignMessageTx = queueCampaignMessageTx;
module.exports.queueSubscriptionMessage = queueSubscriptionMessage;

View file

@ -15,6 +15,7 @@ function spawn(callback) {
log.verbose('Senders', 'Spawning master sender process'); log.verbose('Senders', 'Spawning master sender process');
knex('campaigns').where('status', CampaignStatus.SENDING).update({status: CampaignStatus.SCHEDULED}) knex('campaigns').where('status', CampaignStatus.SENDING).update({status: CampaignStatus.SCHEDULED})
.then(() => knex('campaigns').where('status', CampaignStatus.PAUSING).update({status: CampaignStatus.PAUSED}))
.then(() => { .then(() => {
senderProcess = fork(path.join(__dirname, '..', 'services', 'sender-master.js'), [], { senderProcess = fork(path.join(__dirname, '..', 'services', 'sender-master.js'), [], {
cwd: path.join(__dirname, '..'), cwd: path.join(__dirname, '..'),

View file

@ -5,11 +5,10 @@ const fields = require('../models/fields');
const settings = require('../models/settings'); const settings = require('../models/settings');
const {getTrustedUrl, getPublicUrl} = require('./urls'); const {getTrustedUrl, getPublicUrl} = require('./urls');
const { tUI, tMark } = require('./translate'); const { tUI, tMark } = require('./translate');
const util = require('util');
const contextHelpers = require('./context-helpers'); const contextHelpers = require('./context-helpers');
const {getFieldColumn} = require('../../shared/lists'); const {getFieldColumn} = require('../../shared/lists');
const forms = require('../models/forms'); const forms = require('../models/forms');
const mailers = require('./mailers'); const messageSender = require('./message-sender');
module.exports = { module.exports = {
sendAlreadySubscribed, sendAlreadySubscribed,
@ -138,20 +137,21 @@ async function _sendMail(list, email, template, locale, subjectKey, relativeUrls
try { try {
if (list.send_configuration) { if (list.send_configuration) {
const mailer = await mailers.getOrCreateMailer(list.send_configuration); await messageSender.queueSubscriptionMessage(
await mailer.sendTransactionalMailBasedOnTemplate({ list.send_configuration,
to: { {
name: getDisplayName(flds, subscription), name: getDisplayName(flds, subscription),
address: email address: email
}, },
subject: tUI(subjectKey, locale, { list: list.name }), tUI(subjectKey, locale, { list: list.name }),
encryptionKeys encryptionKeys,
}, { {
html, html,
text, text,
locale, locale,
data data
}); }
);
} else { } else {
log.warn('Subscription', `Not sending email for list id:${list.id} because not send configuration is set.`); log.warn('Subscription', `Not sending email for list id:${list.id} because not send configuration is set.`);
} }

View file

@ -42,9 +42,7 @@ async function getLocalizedFile(basePath, fileName, language) {
} }
async function getTemplate(template, locale) { async function getTemplate(template, locale) {
if (!template) { enforce(template);
return false;
}
const key = getLangCodeFromExpressLocale(locale) + ':' + ((typeof template === 'object') ? hasher.hash(template) : template); const key = getLangCodeFromExpressLocale(locale) + ':' + ((typeof template === 'object') ? hasher.hash(template) : template);
@ -148,7 +146,7 @@ function validateEmailGetMessage(result, address, language) {
} }
function formatMessage(campaign, list, subscription, mergeTags, message, isHTML) { function formatMessage(campaign, list, subscription, mergeTags, message, isHTML) {
const links = getMessageLinks(campaign, list, subscription); const links = campaign && list && subscription ? getMessageLinks(campaign, list, subscription) : {};
return formatTemplate(message, links, mergeTags, isHTML); return formatTemplate(message, links, mergeTags, isHTML);
} }
@ -192,10 +190,6 @@ function formatTemplate(template, links, mergeTags, isHTML) {
} }
async function prepareHtml(html) { async function prepareHtml(html) {
if (!(html || '').toString().trim()) {
return false;
}
const { window } = new JSDOM(html); const { window } = new JSDOM(html);
const head = window.document.querySelector('head'); const head = window.document.querySelector('head');

View file

@ -17,11 +17,11 @@ const {SubscriptionStatus} = require('../../shared/lists');
const subscriptions = require('./subscriptions'); const subscriptions = require('./subscriptions');
const segments = require('./segments'); const segments = require('./segments');
const senders = require('../lib/senders'); const senders = require('../lib/senders');
const {LinkId} = require('./links'); const links = require('./links');
const feedcheck = require('../lib/feedcheck'); const feedcheck = require('../lib/feedcheck');
const contextHelpers = require('../lib/context-helpers'); const contextHelpers = require('../lib/context-helpers');
const {convertFileURLs} = require('../lib/campaign-content'); const {convertFileURLs} = require('../lib/campaign-content');
const {CampaignSender, MessageType} = require('../lib/campaign-sender'); const messageSender = require('../lib/message-sender');
const lists = require('./lists'); const lists = require('./lists');
const {EntityActivityType, CampaignActivityType} = require('../../shared/activity-log'); const {EntityActivityType, CampaignActivityType} = require('../../shared/activity-log');
@ -298,7 +298,7 @@ async function listOpensDTAjax(context, campaignId, params) {
return this.from('campaign_links') return this.from('campaign_links')
.where('campaign_links.campaign', campaignId) .where('campaign_links.campaign', campaignId)
.where('campaign_links.list', cpgList.list) .where('campaign_links.list', cpgList.list)
.where('campaign_links.link', LinkId.OPEN) .where('campaign_links.link', links.LinkId.OPEN)
.as('related_campaign_links'); .as('related_campaign_links');
}, },
'related_campaign_links.subscription', subsTable + '.id') 'related_campaign_links.subscription', subsTable + '.id')
@ -705,16 +705,12 @@ async function getMessageByCid(messageCid, withVerpHostname = false) { // withVe
} }
async function getMessageByResponseId(responseId) { async function getMessageByResponseId(responseId) {
return await knex.transaction(async tx => { return await knex('campaign_messages')
const message = await tx('campaign_messages')
.where('campaign_messages.response_id', responseId) .where('campaign_messages.response_id', responseId)
.select([ .select([
'campaign_messages.id', 'campaign_messages.campaign', 'campaign_messages.list', 'campaign_messages.subscription', 'campaign_messages.status' 'campaign_messages.id', 'campaign_messages.campaign', 'campaign_messages.list', 'campaign_messages.subscription', 'campaign_messages.status'
]) ])
.first(); .first();
return message;
});
} }
const statusFieldMapping = { const statusFieldMapping = {
@ -747,7 +743,6 @@ async function _changeStatusByMessageTx(tx, context, message, subscriptionStatus
} }
async function changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaignCid, listId, subscriptionId, subscriptionStatus) { async function changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaignCid, listId, subscriptionId, subscriptionStatus) {
const campaign = await tx('campaigns').where('cid', campaignCid);
const message = await tx('campaign_messages') const message = await tx('campaign_messages')
.innerJoin('campaigns', 'campaign_messages.campaign', 'campaigns.id') .innerJoin('campaigns', 'campaign_messages.campaign', 'campaigns.id')
.where('campaigns.cid', campaignCid) .where('campaigns.cid', campaignCid)
@ -856,7 +851,7 @@ async function getSubscribersQueryGeneratorTx(tx, campaignId) {
} }
} }
async function _changeStatus(context, campaignId, permittedCurrentStates, newState, invalidStateMessage, scheduled = null) { async function _changeStatus(context, campaignId, permittedCurrentStates, newState, invalidStateMessage, startAt) {
await knex.transaction(async tx => { await knex.transaction(async tx => {
const entity = await getByIdTx(tx, context, campaignId, false); const entity = await getByIdTx(tx, context, campaignId, false);
@ -866,10 +861,18 @@ async function _changeStatus(context, campaignId, permittedCurrentStates, newSta
throw new interoperableErrors.InvalidStateError(invalidStateMessage); throw new interoperableErrors.InvalidStateError(invalidStateMessage);
} }
await tx('campaigns').where('id', campaignId).update({ const updateData = {
status: newState, status: newState,
scheduled };
});
if (startAt !== undefined) {
updateData.scheduled = startAt;
if (!startAt || startAt.valueOf() < Date.now()) {
updateData.start_at = new Date();
}
}
await tx('campaigns').where('id', campaignId).update(updateData);
await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newState}); await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newState});
}); });
@ -929,8 +932,8 @@ async function getStatisticsOpened(context, id) {
return await knex.transaction(async tx => { return await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'campaign', id, 'viewStats'); await shares.enforceEntityPermissionTx(tx, context, 'campaign', id, 'viewStats');
const devices = await tx('campaign_links').where('campaign', id).where('link', LinkId.OPEN).groupBy('device_type').select('device_type AS key').count('* as count'); const devices = await tx('campaign_links').where('campaign', id).where('link', links.LinkId.OPEN).groupBy('device_type').select('device_type AS key').count('* as count');
const countries = await tx('campaign_links').where('campaign', id).where('link', LinkId.OPEN).groupBy('country').select('country AS key').count('* as count'); const countries = await tx('campaign_links').where('campaign', id).where('link', links.LinkId.OPEN).groupBy('country').select('country AS key').count('* as count');
return { return {
devices, devices,
@ -959,7 +962,7 @@ async function testSend(context, data) {
await knex.transaction(async tx => { await knex.transaction(async tx => {
const processSubscriber = async (sendConfigurationId, listId, subscriptionId, messageData) => { const processSubscriber = async (sendConfigurationId, listId, subscriptionId, messageData) => {
await CampaignSender.queueMessageTx(tx, sendConfigurationId, listId, subscriptionId, MessageType.TEST, messageData); await messageSender.queueCampaignMessageTx(tx, sendConfigurationId, listId, subscriptionId, messageSender.MessageType.TEST, messageData);
await activityLog.logEntityActivity('campaign', CampaignActivityType.TEST_SEND, campaignId, {list: listId, subscription: subscriptionId}); await activityLog.logEntityActivity('campaign', CampaignActivityType.TEST_SEND, campaignId, {list: listId, subscription: subscriptionId});
}; };

View file

@ -21,7 +21,7 @@ async function addConfirmation(listId, action, ip, data) {
*/ */
async function takeConfirmation(cid) { async function takeConfirmation(cid) {
return await knex.transaction(async tx => { return await knex.transaction(async tx => {
const entry = await tx('confirmations').select(['cid', 'list', 'action', 'ip', 'data']).where('cid', cid).first(); const entry = await tx('confirmations').select(['cid', 'list', 'action', 'ip', 'data']).where('cid', cid).forUpdate().first();
if (!entry) { if (!entry) {
return false; return false;

View file

@ -504,7 +504,7 @@ async function serverValidate(context, listId, data) {
async function _validateAndPreprocess(tx, listId, groupedFieldsMap, entity, meta, isCreate) { async function _validateAndPreprocess(tx, listId, groupedFieldsMap, entity, meta, isCreate) {
enforce(entity.email, 'Email must be set'); enforce(entity.email, 'Email must be set');
const existingWithKeyQuery = tx(getSubscriptionTableName(listId)).where('hash_email', hashEmail(entity.email)); const existingWithKeyQuery = tx(getSubscriptionTableName(listId)).where('hash_email', hashEmail(entity.email)).forUpdate();
if (!isCreate) { if (!isCreate) {
existingWithKeyQuery.whereNot('id', entity.id); existingWithKeyQuery.whereNot('id', entity.id);

View file

@ -153,7 +153,7 @@ async function remove(context, id) {
const MAX_EMAIL_COUNT = 100; const MAX_EMAIL_COUNT = 100;
async function sendAsTransactionalEmail(context, templateId, sendConfigurationId, emails, subject, mergeTags) { async function sendAsTransactionalEmail(context, templateId, sendConfigurationId, emails, subject, mergeTags) {
// TODO - Update this to use CampaignSender.queueMessageTx (with renderedHtml and renderedText) // TODO - Update this to use MessageSender.queueMessageTx (with renderedHtml and renderedText)
if (emails.length > MAX_EMAIL_COUNT) { if (emails.length > MAX_EMAIL_COUNT) {
throw new Error(`Cannot send more than ${MAX_EMAIL_COUNT} emails at once`); throw new Error(`Cannot send more than ${MAX_EMAIL_COUNT} emails at once`);

View file

@ -12,6 +12,8 @@ const crypto = require('crypto');
const settings = require('./settings'); const settings = require('./settings');
const {getTrustedUrl} = require('../lib/urls'); const {getTrustedUrl} = require('../lib/urls');
const { tUI } = require('../lib/translate'); const { tUI } = require('../lib/translate');
const messageSender = require('../lib/message-sender');
const {getSystemSendConfigurationId} = require('../../shared/send-configurations');
const bluebird = require('bluebird'); const bluebird = require('bluebird');
@ -19,8 +21,6 @@ const bcrypt = require('bcrypt-nodejs');
const bcryptHash = bluebird.promisify(bcrypt.hash.bind(bcrypt)); const bcryptHash = bluebird.promisify(bcrypt.hash.bind(bcrypt));
const bcryptCompare = bluebird.promisify(bcrypt.compare.bind(bcrypt)); const bcryptCompare = bluebird.promisify(bcrypt.compare.bind(bcrypt));
const mailers = require('../lib/mailers');
const passport = require('../lib/passport'); const passport = require('../lib/passport');
const namespaceHelpers = require('../lib/namespace-helpers'); const namespaceHelpers = require('../lib/namespace-helpers');
@ -297,26 +297,31 @@ async function resetAccessToken(userId) {
async function sendPasswordReset(locale, usernameOrEmail) { async function sendPasswordReset(locale, usernameOrEmail) {
enforce(passport.isAuthMethodLocal, 'Local user management is required'); enforce(passport.isAuthMethodLocal, 'Local user management is required');
const resetToken = crypto.randomBytes(16).toString('base64').replace(/[^a-z0-9]/gi, '');
let user;
await knex.transaction(async tx => { await knex.transaction(async tx => {
const user = await tx('users').where('username', usernameOrEmail).orWhere('email', usernameOrEmail).select(['id', 'username', 'email', 'name']).first(); user = await tx('users').where('username', usernameOrEmail).orWhere('email', usernameOrEmail).select(['id', 'username', 'email', 'name']).forUpdate().first();
if (user) { if (user) {
const resetToken = crypto.randomBytes(16).toString('base64').replace(/[^a-z0-9]/gi, '');
await tx('users').where('id', user.id).update({ await tx('users').where('id', user.id).update({
reset_token: resetToken, reset_token: resetToken,
reset_expire: new Date(Date.now() + 60 * 60 * 1000) reset_expire: new Date(Date.now() + 60 * 60 * 1000)
}); });
}
// We intentionally silently ignore the situation when user is not found. This is not to reveal if a user exists in the system.
const { adminEmail } = await settings.get(contextHelpers.getAdminContext(), ['adminEmail']); });
const mailer = await mailers.getOrCreateMailer(); if (user) {
await mailer.sendTransactionalMailBasedOnTemplate({ await messageSender.queueSubscriptionMessage(
to: { getSystemSendConfigurationId(),
{
address: user.email address: user.email
}, },
subject: tUI('mailerPasswordChangeRequest', locale) tUI('mailerPasswordChangeRequest', locale),
}, { null,
{
html: 'users/password-reset-html.hbs', html: 'users/password-reset-html.hbs',
text: 'users/password-reset-text.hbs', text: 'users/password-reset-text.hbs',
locale, locale,
@ -326,10 +331,9 @@ async function sendPasswordReset(locale, usernameOrEmail) {
name: user.name, name: user.name,
confirmUrl: getTrustedUrl(`login/reset/${encodeURIComponent(user.username)}/${encodeURIComponent(resetToken)}`) confirmUrl: getTrustedUrl(`login/reset/${encodeURIComponent(user.username)}/${encodeURIComponent(resetToken)}`)
} }
});
} }
// We intentionally silently ignore the situation when user is not found. This is not to reveal if a user exists in the system. );
}); }
} }
async function isPasswordResetTokenValid(username, resetToken) { async function isPasswordResetTokenValid(username, resetToken) {

View file

@ -1,11 +1,11 @@
'use strict'; 'use strict';
const router = require('../lib/router-async').create(); const router = require('../lib/router-async').create();
const {CampaignSender} = require('../lib/campaign-sender'); const {MessageSender} = require('../lib/message-sender');
router.get('/:campaign/:list/:subscription', (req, res, next) => { router.get('/:campaign/:list/:subscription', (req, res, next) => {
const cs = new CampaignSender(); const cs = new MessageSender();
cs.initByCampaignCid(req.params.campaign) cs.initByCampaignCid(req.params.campaign)
.then(() => cs.getMessage(req.params.list, req.params.subscription)) .then(() => cs.getMessage(req.params.list, req.params.subscription))
.then(result => { .then(result => {

View file

@ -10,7 +10,7 @@ const campaigns = require('../models/campaigns');
const builtinZoneMta = require('../lib/builtin-zone-mta'); const builtinZoneMta = require('../lib/builtin-zone-mta');
const {CampaignActivityType} = require('../../shared/activity-log'); const {CampaignActivityType} = require('../../shared/activity-log');
const activityLog = require('../lib/activity-log'); const activityLog = require('../lib/activity-log');
const {MessageType} = require('../lib/campaign-sender') const {MessageType} = require('../lib/message-sender');
require('../lib/fork'); require('../lib/fork');
class Notifications { class Notifications {
@ -55,19 +55,111 @@ const idleWorkers = [];
let campaignSchedulerRunning = false; let campaignSchedulerRunning = false;
let queuedSchedulerRunning = false; let queuedSchedulerRunning = false;
const campaignsCheckPeriod = 30 * 1000; const checkPeriod = 30 * 1000;
const retrieveBatchSize = 1000; const retrieveBatchSize = 1000;
const workerBatchSize = 10; const workerBatchSize = 10;
const sendConfigurationIdByCampaignId = new Map(); // campaignId -> sendConfigurationId
const sendConfigurationStatuses = new Map(); // sendConfigurationId -> {retryCount, postponeTill}
const sendConfigurationMessageQueue = new Map(); // sendConfigurationId -> [{queuedMessage}] const sendConfigurationMessageQueue = new Map(); // sendConfigurationId -> [{queuedMessage}]
const campaignMessageQueue = new Map(); // campaignId -> [{listId, email}] const campaignMessageQueue = new Map(); // campaignId -> [{listId, email}]
const workAssignment = new Map(); // workerId -> { campaignId, messages: [{listId, email} } / { sendConfigurationId, messages: [{queuedMessage}] } const workAssignment = new Map(); // workerId -> { type: WorkAssignmentType.CAMPAIGN, campaignId, messages: [{listId, email} } / { type: WorkAssignmentType.QUEUED, sendConfigurationId, messages: [{queuedMessage}] }
const WorkAssignmentType = {
CAMPAIGN: 0,
QUEUED: 1
};
const retryBackoff = [10, 20, 30, 30, 60, 60, 120, 120, 300]; // in seconds
function getSendConfigurationStatus(sendConfigurationId) {
let status = sendConfigurationStatuses.get(sendConfigurationId);
if (!status) {
status = {
retryCount: 0,
postponeTill: 0
};
sendConfigurationStatuses.set(sendConfigurationId, status);
}
return status;
}
function setSendConfigurationRetryCount(sendConfigurationStatus, newRetryCount) {
sendConfigurationStatus.retryCount = newRetryCount;
let next = 0;
if (newRetryCount > 0) {
let backoff;
if (newRetryCount > retryBackoff.length) {
backoff = retryBackoff[retryBackoff.length - 1];
} else {
backoff = retryBackoff[newRetryCount - 1];
}
next = Date.now() + backoff * 1000;
setTimeout(scheduleCheck, backoff * 1000);
}
sendConfigurationStatus.postponeTill = next;
}
function isSendConfigurationPostponed(sendConfigurationId) {
const now = Date.now();
const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId);
return sendConfigurationStatus.postponeTill > now;
}
function getPostponedSendConfigurationIds() {
const result = [];
const now = Date.now();
for (const entry of sendConfigurationStatuses.entries()) {
if (entry[1].postponeTill > now) {
result.push(entry[0]);
}
}
return result;
}
function messagesProcessed(workerId) { function getExpirationThresholds() {
const now = Date.now();
return {
[MessageType.TRIGGERED]: {
threshold: now - config.queue.retention.triggered * 1000,
title: 'triggered campaign'
},
[MessageType.TEST]: {
threshold: now - config.queue.retention.test * 1000,
title: 'test campaign'
},
[MessageType.SUBSCRIPTION]: {
threshold: now - config.queue.retention.subscription * 1000,
title: 'subscription and password-related'
}
};
}
function messagesProcessed(workerId, withErrors) {
const wa = workAssignment.get(workerId); const wa = workAssignment.get(workerId);
const sendConfigurationStatus = getSendConfigurationStatus(wa.sendConfigurationId);
if (withErrors) {
if (sendConfigurationStatus.retryCount === wa.sendConfigurationRetryCount) { // This is to avoid multiple increments when more workers simultaneously fail to send messages ot the same send configuration
setSendConfigurationRetryCount(sendConfigurationStatus, sendConfigurationStatus.retryCount + 1);
}
} else {
setSendConfigurationRetryCount(sendConfigurationStatus, 0);
}
workAssignment.delete(workerId); workAssignment.delete(workerId);
idleWorkers.push(workerId); idleWorkers.push(workerId);
@ -75,8 +167,6 @@ function messagesProcessed(workerId) {
} }
async function workersLoop() { async function workersLoop() {
const reservedWorkersForTestCount = workersCount > 1 ? 1 : 0;
async function getAvailableWorker() { async function getAvailableWorker() {
while (idleWorkers.length === 0) { while (idleWorkers.length === 0) {
await notifier.waitFor('workerFinished'); await notifier.waitFor('workerFinished');
@ -85,64 +175,42 @@ async function workersLoop() {
return idleWorkers.shift(); return idleWorkers.shift();
} }
function assignCampaignTaskToWorker(workerId, task) {
const campaignId = task.campaignId;
const queue = task.queue;
const messages = queue.splice(0, workerBatchSize);
workAssignment.set(workerId, {campaignId, messages});
if (queue.length === 0) {
notifier.notify(`campaignMessageQueueEmpty:${campaignId}`);
}
sendToWorker(workerId, 'process-campaign-messages', {
campaignId,
messages
});
}
function assignSendConfigurationTaskToWorker(workerId, task) {
const sendConfigurationId = task.sendConfigurationId;
const queue = task.queue;
const messages = queue.splice(0, workerBatchSize);
workAssignment.set(workerId, {sendConfigurationId, messages});
if (queue.length === 0) {
notifier.notify(`sendConfigurationMessageQueueEmpty:${sendConfigurationId}`);
}
sendToWorker(workerId, 'process-queued-messages', {
sendConfigurationId,
messages
});
}
function selectNextTask() { function selectNextTask() {
const allocationMap = new Map(); const allocationMap = new Map();
const allocation = []; const allocation = [];
function initAllocation(attrName, queues, assignWorkerHandler) { function initAllocation(waType, attrName, queues, workerMsg, getSendConfigurationId, getQueueEmptyEvent) {
for (const id of queues.keys()) { for (const id of queues.keys()) {
const sendConfigurationId = getSendConfigurationId(id);
const key = attrName + ':' + id; const key = attrName + ':' + id;
const queue = queues.get(id); const queue = queues.get(id);
const postponed = isSendConfigurationPostponed(sendConfigurationId);
const task = { const task = {
[attrName]: id, type: waType,
id,
existingWorkers: 0, existingWorkers: 0,
isEmpty: queue.length === 0, isValid: queue.length > 0 && !postponed,
queue, queue,
assignWorkerHandler workerMsg,
attrName,
getQueueEmptyEvent,
sendConfigurationId
}; };
allocationMap.set(key, task); allocationMap.set(key, task);
allocation.push(task); allocation.push(task);
if (postponed && queue.length > 0) {
queue.splice(0);
notifier.notify(task.getQueueEmptyEvent(task));
}
} }
for (const wa of workAssignment.values()) { for (const wa of workAssignment.values()) {
if (wa[attrName]) { if (wa.type === waType) {
const key = attrName + ':' + wa[attrName]; const key = attrName + ':' + wa[attrName];
const task = allocationMap.get(key); const task = allocationMap.get(key);
task.existingWorkers += 1; task.existingWorkers += 1;
@ -150,14 +218,29 @@ async function workersLoop() {
} }
} }
initAllocation('sendConfigurationId', sendConfigurationMessageQueue, assignSendConfigurationTaskToWorker); initAllocation(
initAllocation('campaignId', campaignMessageQueue, assignCampaignTaskToWorker); WorkAssignmentType.QUEUED,
'sendConfigurationId',
sendConfigurationMessageQueue,
'process-queued-messages',
id => id,
task => `sendConfigurationMessageQueueEmpty:${task.id}`
);
initAllocation(
WorkAssignmentType.CAMPAIGN,
'campaignId',
campaignMessageQueue,
'process-campaign-messages',
id => sendConfigurationIdByCampaignId.get(id),
task => `campaignMessageQueueEmpty:${task.id}`
);
let minTask = null; let minTask = null;
let minExistingWorkers; let minExistingWorkers;
for (const task of allocation) { for (const task of allocation) {
if (!task.isEmpty && (minTask === null || minExistingWorkers > task.existingWorkers)) { if (task.isValid && (minTask === null || minExistingWorkers > task.existingWorkers)) {
minTask = task; minTask = task;
minExistingWorkers = task.existingWorkers; minExistingWorkers = task.existingWorkers;
} }
@ -172,8 +255,31 @@ async function workersLoop() {
if (task) { if (task) {
const workerId = await getAvailableWorker(); const workerId = await getAvailableWorker();
task.assignWorkerHandler(workerId, task);
const attrName = task.attrName;
const sendConfigurationId = task.sendConfigurationId;
const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId);
const sendConfigurationRetryCount = sendConfigurationStatus.retryCount;
const queue = task.queue;
const messages = queue.splice(0, workerBatchSize);
workAssignment.set(workerId, {
type: task.type,
[attrName]: task.id,
sendConfigurationId,
sendConfigurationRetryCount,
messages
});
if (queue.length === 0) {
notifier.notify(task.getQueueEmptyEvent(task));
}
sendToWorker(workerId, task.workerMsg, {
[attrName]: task.id,
messages
});
} else { } else {
await notifier.waitFor('workAvailable'); await notifier.waitFor('workAvailable');
} }
@ -184,14 +290,18 @@ async function workersLoop() {
async function processCampaign(campaignId) { async function processCampaign(campaignId) {
const msgQueue = campaignMessageQueue.get(campaignId); const msgQueue = campaignMessageQueue.get(campaignId);
async function finish(newStatus) { async function finish(clearMsgQueue, newStatus) {
if (clearMsgQueue) {
msgQueue.splice(0);
}
const isCompleted = () => { const isCompleted = () => {
if (msgQueue.length > 0) return false; if (msgQueue.length > 0) return false;
let workerRunning = false; let workerRunning = false;
for (const wa of workAssignment.values()) { for (const wa of workAssignment.values()) {
if (wa.campaignId === campaignId) { if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
workerRunning = true; workerRunning = true;
} }
} }
@ -213,10 +323,20 @@ async function processCampaign(campaignId) {
while (true) { while (true) {
const cpg = await knex('campaigns').where('id', campaignId).first(); const cpg = await knex('campaigns').where('id', campaignId).first();
const expirationThreshold = Date.now() - config.queue.retention.campaign * 1000;
if (cpg.start_at.valueOf() < expirationThreshold) {
return await finish(true, CampaignStatus.FINISHED);
}
if (cpg.status === CampaignStatus.PAUSING) { if (cpg.status === CampaignStatus.PAUSING) {
msgQueue.splice(0); return await finish(true, CampaignStatus.PAUSED);
await finish(CampaignStatus.PAUSED); }
return;
sendConfigurationIdByCampaignId.set(cpg.id, cpg.send_configuration);
if (isSendConfigurationPostponed(cpg.send_configuration)) {
// postpone campaign if its send configuration is problematic
return await finish(true, CampaignStatus.SCHEDULED);
} }
let qryGen; let qryGen;
@ -227,7 +347,7 @@ async function processCampaign(campaignId) {
if (qryGen) { if (qryGen) {
let messagesInProcessing = [...msgQueue]; let messagesInProcessing = [...msgQueue];
for (const wa of workAssignment.values()) { for (const wa of workAssignment.values()) {
if (wa.campaignId === campaignId) { if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) {
messagesInProcessing = messagesInProcessing.concat(wa.messages); messagesInProcessing = messagesInProcessing.concat(wa.messages);
} }
} }
@ -239,8 +359,7 @@ async function processCampaign(campaignId) {
const subs = await qry; const subs = await qry;
if (subs.length === 0) { if (subs.length === 0) {
await finish(CampaignStatus.FINISHED); return await finish(false, CampaignStatus.FINISHED);
return;
} }
for (const sub of subs) { for (const sub of subs) {
@ -257,8 +376,7 @@ async function processCampaign(campaignId) {
} }
} else { } else {
await finish(CampaignStatus.FINISHED); return await finish(false, CampaignStatus.FINISHED);
return;
} }
} }
} catch (err) { } catch (err) {
@ -276,15 +394,29 @@ async function scheduleCampaigns() {
campaignSchedulerRunning = true; campaignSchedulerRunning = true;
try { try {
// finish old campaigns
const nowDate = new Date();
const now = nowDate.valueOf();
const expirationThreshold = new Date(now - config.queue.retention.campaign * 1000);
const expiredCampaigns = await knex('campaigns')
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.whereIn('campaigns.status', [CampaignStatus.SCHEDULED, CampaignStatus.PAUSED])
.where('campaigns.start_at', '<', expirationThreshold)
.update({status: CampaignStatus.FINISHED});
while (true) { while (true) {
let campaignId = 0; let campaignId = 0;
const postponedSendConfigurationIds = getPostponedSendConfigurationIds();
await knex.transaction(async tx => { await knex.transaction(async tx => {
const scheduledCampaign = await tx('campaigns') const scheduledCampaign = await tx('campaigns')
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY]) .whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.whereNotIn('campaigns.send_configuration', postponedSendConfigurationIds)
.where('campaigns.status', CampaignStatus.SCHEDULED) .where('campaigns.status', CampaignStatus.SCHEDULED)
.where(qry => qry.whereNull('campaigns.scheduled').orWhere('campaigns.scheduled', '<=', new Date())) .where('campaigns.start_at', '<=', nowDate)
.select(['id']) .select(['id'])
.forUpdate()
.first(); .first();
if (scheduledCampaign) { if (scheduledCampaign) {
@ -322,7 +454,7 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
let workerRunning = false; let workerRunning = false;
for (const wa of workAssignment.values()) { for (const wa of workAssignment.values()) {
if (wa.sendConfigurationId === sendConfigurationId) { if (wa.type === WorkAssignmentType.QUEUED && wa.sendConfigurationId === sendConfigurationId) {
workerRunning = true; workerRunning = true;
} }
} }
@ -330,19 +462,40 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
return !workerRunning; return !workerRunning;
}; };
async function finish(clearMsgQueue, deleteMsgQueue) {
if (clearMsgQueue) {
msgQueue.splice(0);
}
while (!isCompleted()) {
await notifier.waitFor('workerFinished');
}
if (deleteMsgQueue) {
sendConfigurationMessageQueue.delete(sendConfigurationId);
}
}
try { try {
while (true) { while (true) {
if (isSendConfigurationPostponed(sendConfigurationId)) {
return finish(true, true);
}
let messagesInProcessing = [...msgQueue]; let messagesInProcessing = [...msgQueue];
for (const wa of workAssignment.values()) { for (const wa of workAssignment.values()) {
if (wa.sendConfigurationId === sendConfigurationId) { if (wa.type === WorkAssignmentType.QUEUED && wa.sendConfigurationId === sendConfigurationId) {
messagesInProcessing = messagesInProcessing.concat(wa.messages); messagesInProcessing = messagesInProcessing.concat(wa.messages);
} }
} }
const messageIdsInProcessing = messagesInProcessing.map(x => x.queuedMessage.id);
const rows = await knex('queued') const rows = await knex('queued')
.orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.TEST}) DESC, id ASC`) // This orders MessageType.TEST messages before MessageType.TRIGGERED ones .orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST and MessageType.TRIGGERED
.where('send_configuration', sendConfigurationId) .where('send_configuration', sendConfigurationId)
.whereNotIn('id', messagesInProcessing.map(x => x.queuedMessage.id)) .whereNotIn('id', messageIdsInProcessing)
.limit(retrieveBatchSize); .limit(retrieveBatchSize);
if (rows.length === 0) { if (rows.length === 0) {
@ -351,21 +504,44 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
return; return;
} else { } else {
while (!isCompleted()) { finish(false, false);
await notifier.waitFor('workerFinished');
}
// At this point, there might be new messages in the queued that could belong to us. Thus we have to try again instead for returning. // At this point, there might be new messages in the queued that could belong to us. Thus we have to try again instead for returning.
continue; continue;
} }
} }
const expirationThresholds = getExpirationThresholds();
const expirationCounters = {};
for (const type of Object.keys(expirationThresholds)) {
expirationCounters[type] = 0;
}
for (const row of rows) { for (const row of rows) {
for (const type of Object.keys(expirationThresholds)) {
if (row.type === type) {
const expirationThreshold = expirationThresholds[type];
if (row.created < expirationThreshold.threshold) {
expirationCounters[type] += 1;
await knex('queued').where('id', row.id).del();
} else {
row.data = JSON.parse(row.data); row.data = JSON.parse(row.data);
msgQueue.push({ msgQueue.push({
queuedMessage: row queuedMessage: row
}); });
} }
}
}
}
for (const type of Object.keys(expirationThresholds)) {
const expirationThreshold = expirationThresholds[type];
if (expirationCounters[type] > 0) {
log.warn('Senders', `Discarded ${expirationCounters[type]} expired ${expirationThreshold.title} message(s).`);
}
}
notifier.notify('workAvailable'); notifier.notify('workAvailable');
@ -387,11 +563,27 @@ async function scheduleQueued() {
queuedSchedulerRunning = true; queuedSchedulerRunning = true;
try { try {
while (true) { const sendConfigurationsIdsInProcessing = [...sendConfigurationMessageQueue.keys()];
const sendConfigurationsInProcessing = [...sendConfigurationMessageQueue.keys()]; const postponedSendConfigurationIds = getPostponedSendConfigurationIds();
// prune old messages
const expirationThresholds = getExpirationThresholds();
for (const type of Object.keys(expirationThresholds)) {
const expirationThreshold = expirationThresholds[type];
const expiredCount = await knex('queued')
.whereNotIn('send_configuration', sendConfigurationsIdsInProcessing)
.where('type', type)
.where('created', '<', expirationThreshold.threshold)
.del();
if (expiredCount) {
log.warn('Senders', `Discarded ${expiredCount} expired ${expirationThreshold.title} message(s).`);
}
}
const rows = await knex('queued') const rows = await knex('queued')
.whereNotIn('send_configuration', sendConfigurationsInProcessing) .whereNotIn('send_configuration', [...sendConfigurationsIdsInProcessing, ...postponedSendConfigurationIds])
.groupBy('send_configuration') .groupBy('send_configuration')
.select(['send_configuration']); .select(['send_configuration']);
@ -402,7 +594,6 @@ async function scheduleQueued() {
// noinspection JSIgnoredPromiseFromCall // noinspection JSIgnoredPromiseFromCall
processQueuedBySendConfiguration(sendConfigurationId); processQueuedBySendConfiguration(sendConfigurationId);
} }
}
} catch (err) { } catch (err) {
log.error('Senders', `Scheduling queued messages failed with error: ${err.message}`); log.error('Senders', `Scheduling queued messages failed with error: ${err.message}`);
log.verbose(err.stack); log.verbose(err.stack);
@ -431,7 +622,7 @@ async function spawnWorker(workerId) {
return resolve(); return resolve();
} else if (msg.type === 'messages-processed') { } else if (msg.type === 'messages-processed') {
messagesProcessed(workerId); messagesProcessed(workerId, msg.data.withErrors);
} }
} }
@ -457,16 +648,22 @@ function sendToWorker(workerId, msgType, data) {
} }
function periodicCampaignsCheck() { function scheduleCheck() {
// noinspection JSIgnoredPromiseFromCall // noinspection JSIgnoredPromiseFromCall
scheduleCampaigns(); scheduleCampaigns();
// noinspection JSIgnoredPromiseFromCall // noinspection JSIgnoredPromiseFromCall
scheduleQueued(); scheduleQueued();
setTimeout(periodicCampaignsCheck, campaignsCheckPeriod);
} }
function periodicCheck() {
// noinspection JSIgnoredPromiseFromCall
scheduleCheck();
setTimeout(periodicCheck, checkPeriod);
}
async function init() { async function init() {
const spawnWorkerFutures = []; const spawnWorkerFutures = [];
let workerId; let workerId;
@ -482,10 +679,18 @@ async function init() {
if (type === 'schedule-check') { if (type === 'schedule-check') {
// noinspection JSIgnoredPromiseFromCall // noinspection JSIgnoredPromiseFromCall
scheduleCampaigns(); scheduleCheck();
scheduleQueued();
} else if (type === 'reload-config') { } else if (type === 'reload-config') {
const sendConfigurationStatus = getSendConfigurationStatus(msg.data.sendConfigurationId);
if (sendConfigurationStatus.retryCount > 0) {
const sendConfigurationStatus = getSendConfigurationStatus(msg.data.sendConfigurationId)
setSendConfigurationRetryCount(sendConfigurationStatus, 0);
// noinspection JSIgnoredPromiseFromCall
scheduleCheck();
}
for (const workerId of workerProcesses.keys()) { for (const workerId of workerProcesses.keys()) {
sendToWorker(workerId, 'reload-config', msg.data); sendToWorker(workerId, 'reload-config', msg.data);
} }
@ -501,7 +706,7 @@ async function init() {
type: 'master-sender-started' type: 'master-sender-started'
}); });
periodicCampaignsCheck(); periodicCheck();
setImmediate(workersLoop); setImmediate(workersLoop);
} }

View file

@ -3,7 +3,7 @@
const config = require('config'); const config = require('config');
const log = require('../lib/log'); const log = require('../lib/log');
const mailers = require('../lib/mailers'); const mailers = require('../lib/mailers');
const {CampaignSender} = require('../lib/campaign-sender'); const messageSender = require('../lib/message-sender');
require('../lib/fork'); require('../lib/fork');
const workerId = Number.parseInt(process.argv[2]); const workerId = Number.parseInt(process.argv[2]);
@ -17,23 +17,33 @@ async function processCampaignMessages(campaignId, messages) {
running = true; running = true;
const cs = new CampaignSender(); const cs = new MessageSender();
await cs.initByCampaignId(campaignId); await cs.initByCampaignId(campaignId);
let withErrors = false;
for (const msgData of messages) { for (const msgData of messages) {
try { try {
await cs.sendRegularMessage(msgData.listId, msgData.email); await cs.sendRegularMessage(msgData.listId, msgData.email);
log.verbose('Senders', 'Message sent and status updated for %s:%s', msgData.listId, msgData.email); log.verbose('Senders', 'Message sent and status updated for %s:%s', msgData.listId, msgData.email);
} catch (err) { } catch (err) {
log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}`);
if (err instanceof mailers.SendConfigurationError) {
log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}. Will retry the message if within retention interval.`);
withErrors = true;
break;
} else {
log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}. Dropping the message.`);
log.verbose(err.stack); log.verbose(err.stack);
} }
} }
}
running = false; running = false;
sendToMaster('messages-processed'); sendToMaster('messages-processed', { withErrors });
} }
async function processQueuedMessages(sendConfigurationId, messages) { async function processQueuedMessages(sendConfigurationId, messages) {
@ -44,26 +54,34 @@ async function processQueuedMessages(sendConfigurationId, messages) {
running = true; running = true;
let withErrors = false;
for (const msgData of messages) { for (const msgData of messages) {
const queuedMessage = msgData.queuedMessage; const queuedMessage = msgData.queuedMessage;
try { try {
await CampaignSender.sendQueuedMessage(queuedMessage); await messageSender.sendQueuedMessage(queuedMessage);
log.verbose('Senders', 'Message sent and status updated for %s:%s', queuedMessage.list, queuedMessage.subscription); log.verbose('Senders', 'Message sent and status updated for %s:%s', queuedMessage.list, queuedMessage.subscription);
} catch (err) { } catch (err) {
log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}`); if (err instanceof mailers.SendConfigurationError) {
log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}. Will retry the message if within retention interval.`);
withErrors = true;
break;
} else {
log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}. Dropping the message.`);
log.verbose(err.stack); log.verbose(err.stack);
} }
} }
}
running = false; running = false;
sendToMaster('messages-processed'); sendToMaster('messages-processed', { withErrors });
} }
function sendToMaster(msgType) { function sendToMaster(msgType, data) {
process.send({ process.send({
type: msgType type: msgType,
data
}); });
} }

View file

@ -10,7 +10,7 @@ const { Entity, Event } = require('../../shared/triggers');
const { SubscriptionStatus } = require('../../shared/lists'); const { SubscriptionStatus } = require('../../shared/lists');
const links = require('../models/links'); const links = require('../models/links');
const contextHelpers = require('../lib/context-helpers'); const contextHelpers = require('../lib/context-helpers');
const {MessageType, CampaignSender} = require('../lib/campaign-sender'); const messageSender = require('../lib/message-sender');
const triggerCheckPeriod = 30 * 1000; const triggerCheckPeriod = 30 * 1000;
const triggerFirePeriod = 120 * 1000; const triggerFirePeriod = 120 * 1000;
@ -152,8 +152,8 @@ async function run() {
subscription: subscriber.id subscription: subscriber.id
}); });
await CampaignSender.queueMessageTx(tx, await messageSender.queueCampaignMessageTx(tx,
campaign.send_configuration, cpgList.list, subscriber.id, MessageType.TRIGGERED, campaign.send_configuration, cpgList.list, subscriber.id, messageSender.MessageType.TRIGGERED,
{ {
campaignId: campaign.id, campaignId: campaign.id,
triggerId: trigger.id triggerId: trigger.id

View file

@ -15,7 +15,7 @@ const entityTypesWithFiles = {
exports.up = (knex, Promise) => (async() => { exports.up = (knex, Promise) => (async() => {
await knex.schema.table('queued', table => { await knex.schema.table('queued', table => {
table.integer('send_configuration').unsigned().notNullable(); table.integer('send_configuration').unsigned().notNullable();
table.integer('type').unsigned().notNullable(); // The values come from campaign-sender.js:MessageType table.integer('type').unsigned().notNullable(); // The values come from message-sender.js:MessageType
table.text('data', 'longtext'); table.text('data', 'longtext');
}); });

View file

@ -0,0 +1,26 @@
const { CampaignType, CampaignStatus } = require('../../../../shared/campaigns');
exports.up = (knex, Promise) => (async() => {
await knex.schema.table('campaigns', table => {
table.timestamp('start_at').nullable().defaultTo(null);
});
await knex('campaigns')
.whereIn('type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.whereIn('status', [CampaignStatus.SCHEDULED, CampaignStatus.SENDING, CampaignStatus.PAUSING, CampaignStatus.PAUSED])
.whereNotNull('scheduled')
.update({
start_at: knex.raw('scheduled')
});
await knex('campaigns')
.whereIn('type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.whereIn('status', [CampaignStatus.SCHEDULED, CampaignStatus.SENDING, CampaignStatus.PAUSING, CampaignStatus.PAUSED])
.whereNull('scheduled')
.update({
start_at: new Date()
});
})();
exports.down = (knex, Promise) => (async() => {
})();

View file

@ -0,0 +1,25 @@
exports.up = (knex, Promise) => (async() => {
const queued = await knex('queued');
for (const queuedEntry of queued) {
const data = JSON.parse(queuedEntry.data);
data.listId = queuedEntry.list;
data.subscriptionId = queuedEntry.subscription;
knex('queued')
.where('id', queuedEntry.id)
.update({
data: JSON.stringify(data)
});
}
await knex.schema.table('queued', table => {
table.dropColumn('list');
table.dropColumn('subscription');
});
})();
exports.down = (knex, Promise) => (async() => {
})();