Work in progress on refactoring all mail sending to use the message sender an sender workers.

Some fixes related to subscriptions and password reset.
This commit is contained in:
Tomas Bures 2019-06-30 10:47:09 +02:00
parent 4e9f6bd57b
commit 450b930cc5
8 changed files with 95 additions and 76 deletions

View file

@ -49,7 +49,7 @@ export default class Account extends Component {
} }
submitFormValuesMutator(data) { submitFormValuesMutator(data) {
return filterData(data, ['password']); return filterData(data, ['username', 'password', 'resetToken']);
} }
@withAsyncErrorHandler @withAsyncErrorHandler

View file

@ -22,6 +22,7 @@ const {getPublicUrl} = require('./urls');
const blacklist = require('../models/blacklist'); const blacklist = require('../models/blacklist');
const libmime = require('libmime'); const libmime = require('libmime');
const { enforce } = require('./helpers'); const { enforce } = require('./helpers');
const senders = require('./senders');
const MessageType = { const MessageType = {
REGULAR: 0, REGULAR: 0,
@ -95,9 +96,6 @@ class MessageSender {
this.listsByCid.set(list.cid, list); this.listsByCid.set(list.cid, list);
this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id));
} }
} else {
enforce(false);
} }
if (settings.attachments) { if (settings.attachments) {
@ -119,7 +117,7 @@ class MessageSender {
} }
if (settings.renderedHtml !== undefined) { if (settings.renderedHtml !== undefined) {
this.rendereHtml = settings.rendereHtml; this.renderedHtml = settings.renderedHtml;
this.renderedText = settings.renderedText; this.renderedText = settings.renderedText;
} else if (settings.html !== undefined) { } else if (settings.html !== undefined) {
@ -146,6 +144,7 @@ class MessageSender {
let renderTags = false; let renderTags = false;
const campaign = this.campaign; const campaign = this.campaign;
if (this.renderedHtml !== undefined) { if (this.renderedHtml !== undefined) {
html = this.renderedHtml; html = this.renderedHtml;
text = this.renderedText; text = this.renderedText;
@ -497,6 +496,12 @@ class MessageSender {
} }
} }
async function dropQueuedMessage(queuedMessage) {
await knex('queued')
.where({id: queuedMessage.id})
.del();
}
async function sendQueuedMessage(queuedMessage) { async function sendQueuedMessage(queuedMessage) {
const msgData = queuedMessage.data; const msgData = queuedMessage.data;
@ -574,11 +579,13 @@ async function queueSubscriptionMessage(sendConfigurationId, to, subject, encryp
encryptionKeys encryptionKeys
}; };
await tx('queued').insert({ await knex('queued').insert({
send_configuration: sendConfigurationId, send_configuration: sendConfigurationId,
type: MessageType.SUBSCRIPTION, type: MessageType.SUBSCRIPTION,
data: JSON.stringify(msgData) data: JSON.stringify(msgData)
}); });
senders.scheduleCheck();
} }
module.exports.MessageSender = MessageSender; module.exports.MessageSender = MessageSender;
@ -586,3 +593,4 @@ module.exports.MessageType = MessageType;
module.exports.sendQueuedMessage = sendQueuedMessage; module.exports.sendQueuedMessage = sendQueuedMessage;
module.exports.queueCampaignMessageTx = queueCampaignMessageTx; module.exports.queueCampaignMessageTx = queueCampaignMessageTx;
module.exports.queueSubscriptionMessage = queueSubscriptionMessage; module.exports.queueSubscriptionMessage = queueSubscriptionMessage;
module.exports.dropQueuedMessage = dropQueuedMessage;

View file

@ -9,6 +9,7 @@ const contextHelpers = require('./context-helpers');
const {getFieldColumn} = require('../../shared/lists'); const {getFieldColumn} = require('../../shared/lists');
const forms = require('../models/forms'); const forms = require('../models/forms');
const messageSender = require('./message-sender'); const messageSender = require('./message-sender');
const tools = require('./tools');
module.exports = { module.exports = {
sendAlreadySubscribed, sendAlreadySubscribed,
@ -64,37 +65,12 @@ async function sendUnsubscriptionConfirmed(locale, list, email, subscription) {
await _sendMail(list, email, 'unsubscription_confirmed', locale, tMark('listUnsubscriptionConfirmed'), relativeUrls, subscription); await _sendMail(list, email, 'unsubscription_confirmed', locale, tMark('listUnsubscriptionConfirmed'), relativeUrls, subscription);
} }
function getDisplayName(flds, subscription) {
let firstName, lastName, name;
for (const fld of flds) {
if (fld.key === 'FIRST_NAME') {
firstName = subscription[fld.column];
}
if (fld.key === 'LAST_NAME') {
lastName = subscription[fld.column];
}
if (fld.key === 'NAME') {
name = subscription[fld.column];
}
}
if (name) {
return name;
} else if (firstName && lastName) {
return firstName + ' ' + lastName;
} else if (lastName) {
return lastName;
} else if (firstName) {
return firstName;
} else {
return '';
}
}
async function _sendMail(list, email, template, locale, subjectKey, relativeUrls, subscription) { async function _sendMail(list, email, template, locale, subjectKey, relativeUrls, subscription) {
subscription = {
...subscription,
email
};
const flds = await fields.list(contextHelpers.getAdminContext(), list.id); const flds = await fields.list(contextHelpers.getAdminContext(), list.id);
const encryptionKeys = []; const encryptionKeys = [];
@ -136,11 +112,13 @@ async function _sendMail(list, email, template, locale, subjectKey, relativeUrls
} }
try { try {
const mergeTags = fields.getMergeTags(flds, subscription);
if (list.send_configuration) { if (list.send_configuration) {
await messageSender.queueSubscriptionMessage( await messageSender.queueSubscriptionMessage(
list.send_configuration, list.send_configuration,
{ {
name: getDisplayName(flds, subscription), name: list.to_name === null ? undefined : tools.formatTemplate(list.to_name, {}, mergeTags, false),
address: email address: email
}, },
tUI(subjectKey, locale, { list: list.name }), tUI(subjectKey, locale, { list: list.name }),

View file

@ -1,12 +1,9 @@
'use strict'; 'use strict';
const util = require('util');
const isemail = require('isemail'); const isemail = require('isemail');
const path = require('path'); const path = require('path');
const {getPublicUrl} = require('./urls'); const {getPublicUrl} = require('./urls');
const {enforce} = require('./helpers');
const bluebird = require('bluebird');
const hasher = require('node-object-hash')(); const hasher = require('node-object-hash')();
const mjml2html = require('mjml'); const mjml2html = require('mjml');

View file

@ -125,8 +125,6 @@ async function listDTAjax(context, params) {
async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) { async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) {
enforce(await tools.validateEmail(entity.email) === 0, 'Invalid email'); enforce(await tools.validateEmail(entity.email) === 0, 'Invalid email');
await namespaceHelpers.validateEntity(tx, entity);
const otherUserWithSameEmailQuery = tx('users').where('email', entity.email); const otherUserWithSameEmailQuery = tx('users').where('email', entity.email);
if (entity.id) { if (entity.id) {
otherUserWithSameEmailQuery.andWhereNot('id', entity.id); otherUserWithSameEmailQuery.andWhereNot('id', entity.id);
@ -138,6 +136,9 @@ async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) {
if (!isOwnAccount) { if (!isOwnAccount) {
await namespaceHelpers.validateEntity(tx, entity);
enforce(entity.role in config.roles.global, 'Unknown role');
const otherUserWithSameUsernameQuery = tx('users').where('username', entity.username); const otherUserWithSameUsernameQuery = tx('users').where('username', entity.username);
if (!isCreate) { if (!isCreate) {
otherUserWithSameUsernameQuery.andWhereNot('id', entity.id); otherUserWithSameUsernameQuery.andWhereNot('id', entity.id);
@ -148,8 +149,6 @@ async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) {
} }
} }
enforce(entity.role in config.roles.global, 'Unknown role');
enforce(!isCreate || entity.password.length > 0, 'Password not set'); enforce(!isCreate || entity.password.length > 0, 'Password not set');
if (entity.password) { if (entity.password) {

View file

@ -5,7 +5,6 @@ const log = require('../lib/log');
const knex = require('../lib/knex'); const knex = require('../lib/knex');
const feedparser = require('feedparser-promised'); const feedparser = require('feedparser-promised');
const { CampaignType, CampaignStatus, CampaignSource } = require('../../shared/campaigns'); const { CampaignType, CampaignStatus, CampaignSource } = require('../../shared/campaigns');
const util = require('util');
const campaigns = require('../models/campaigns'); const campaigns = require('../models/campaigns');
const contextHelpers = require('../lib/context-helpers'); const contextHelpers = require('../lib/context-helpers');
require('../lib/fork'); require('../lib/fork');

View file

@ -175,6 +175,10 @@ async function workersLoop() {
return idleWorkers.shift(); return idleWorkers.shift();
} }
function cancelWorker(workerId) {
idleWorkers.push(workerId);
}
function selectNextTask() { function selectNextTask() {
const allocationMap = new Map(); const allocationMap = new Map();
const allocation = []; const allocation = [];
@ -251,11 +255,10 @@ async function workersLoop() {
while (true) { while (true) {
const workerId = await getAvailableWorker();
const task = selectNextTask(); const task = selectNextTask();
if (task) { if (task) {
const workerId = await getAvailableWorker();
const attrName = task.attrName; const attrName = task.attrName;
const sendConfigurationId = task.sendConfigurationId; const sendConfigurationId = task.sendConfigurationId;
const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId); const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId);
@ -280,7 +283,9 @@ async function workersLoop() {
[attrName]: task.id, [attrName]: task.id,
messages messages
}); });
} else { } else {
cancelWorker(workerId);
await notifier.waitFor('workAvailable'); await notifier.waitFor('workAvailable');
} }
} }
@ -394,7 +399,7 @@ async function scheduleCampaigns() {
campaignSchedulerRunning = true; campaignSchedulerRunning = true;
try { try {
// finish old campaigns // Finish old campaigns
const nowDate = new Date(); const nowDate = new Date();
const now = nowDate.valueOf(); const now = nowDate.valueOf();
@ -405,6 +410,22 @@ async function scheduleCampaigns() {
.where('campaigns.start_at', '<', expirationThreshold) .where('campaigns.start_at', '<', expirationThreshold)
.update({status: CampaignStatus.FINISHED}); .update({status: CampaignStatus.FINISHED});
// Empty message queues for PAUSING campaigns. A pausing campaign typically waits for campaignMessageQueueEmpty before it can check for PAUSING
// We speed this up by discarding messages in the message queue of the campaign.
const pausingCampaigns = await knex('campaigns')
.whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY])
.where('campaigns.status', CampaignStatus.PAUSING)
.select(['id'])
.forUpdate();
for (const cpg of pausingCampaigns) {
const campaignId = cpg.id;
const queue = campaignMessageQueue.get(campaignId);
queue.splice(0);
notifier.notify(`campaignMessageQueueEmpty:${campaignId}`);
}
while (true) { while (true) {
let campaignId = 0; let campaignId = 0;
const postponedSendConfigurationIds = getPostponedSendConfigurationIds(); const postponedSendConfigurationIds = getPostponedSendConfigurationIds();
@ -513,17 +534,15 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
const expirationThresholds = getExpirationThresholds(); const expirationThresholds = getExpirationThresholds();
const expirationCounters = {}; const expirationCounters = {};
for (const type of Object.keys(expirationThresholds)) { for (const type in expirationThresholds) {
expirationCounters[type] = 0; expirationCounters[type] = 0;
} }
for (const row of rows) { for (const row of rows) {
for (const type of Object.keys(expirationThresholds)) { const expirationThreshold = expirationThresholds[row.type];
if (row.type === type) {
const expirationThreshold = expirationThresholds[type];
if (row.created < expirationThreshold.threshold) { if (row.created < expirationThreshold.threshold) {
expirationCounters[type] += 1; expirationCounters[row.type] += 1;
await knex('queued').where('id', row.id).del(); await knex('queued').where('id', row.id).del();
} else { } else {
@ -533,10 +552,8 @@ async function processQueuedBySendConfiguration(sendConfigurationId) {
}); });
} }
} }
}
}
for (const type of Object.keys(expirationThresholds)) { for (const type in expirationThresholds) {
const expirationThreshold = expirationThresholds[type]; const expirationThreshold = expirationThresholds[type];
if (expirationCounters[type] > 0) { if (expirationCounters[type] > 0) {
log.warn('Senders', `Discarded ${expirationCounters[type]} expired ${expirationThreshold.title} message(s).`); log.warn('Senders', `Discarded ${expirationCounters[type]} expired ${expirationThreshold.title} message(s).`);
@ -568,7 +585,7 @@ async function scheduleQueued() {
// prune old messages // prune old messages
const expirationThresholds = getExpirationThresholds(); const expirationThresholds = getExpirationThresholds();
for (const type of Object.keys(expirationThresholds)) { for (const type in expirationThresholds) {
const expirationThreshold = expirationThresholds[type]; const expirationThreshold = expirationThresholds[type];
const expiredCount = await knex('queued') const expiredCount = await knex('queued')

View file

@ -17,25 +17,25 @@ async function processCampaignMessages(campaignId, messages) {
running = true; running = true;
const cs = new MessageSender(); const cs = new messageSender.MessageSender();
await cs.initByCampaignId(campaignId); await cs.initByCampaignId(campaignId);
let withErrors = false; let withErrors = false;
for (const msgData of messages) { for (const msg of messages) {
try { try {
await cs.sendRegularMessage(msgData.listId, msgData.email); await cs.sendRegularMessage(msg.listId, msg.email);
log.verbose('Senders', 'Message sent and status updated for %s:%s', msgData.listId, msgData.email); log.verbose('Senders', 'Message sent and status updated for %s:%s', msg.listId, msg.email);
} catch (err) { } catch (err) {
if (err instanceof mailers.SendConfigurationError) { if (err instanceof mailers.SendConfigurationError) {
log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}. Will retry the message if within retention interval.`); log.error('Senders', `Sending message to ${msg.listId}:${msg.email} failed with error: ${err.message}. Will retry the message if within retention interval.`);
withErrors = true; withErrors = true;
break; break;
} else { } else {
log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}. Dropping the message.`); log.error('Senders', `Sending message to ${msg.listId}:${msg.email} failed with error: ${err.message}.`);
log.verbose(err.stack); log.verbose(err.stack);
} }
} }
@ -56,19 +56,40 @@ async function processQueuedMessages(sendConfigurationId, messages) {
let withErrors = false; let withErrors = false;
for (const msgData of messages) { for (const msg of messages) {
const queuedMessage = msgData.queuedMessage; const queuedMessage = msg.queuedMessage;
const msgData = queuedMessage.data;
let target = '';
if (msgData.listId && msgData.subscriptionId) {
target = `${msgData.listId}:${msgData.subscriptionId}`;
} else if (msgData.to) {
if (msgData.to.name && msgData.to.address) {
target = `${msgData.to.name} <${msgData.to.address}>`;
} else if (msgData.to.address) {
target = msgData.to.address;
} else {
target = msgData.to.toString();
}
}
try { try {
await messageSender.sendQueuedMessage(queuedMessage); await messageSender.sendQueuedMessage(queuedMessage);
log.verbose('Senders', 'Message sent and status updated for %s:%s', queuedMessage.list, queuedMessage.subscription); log.verbose('Senders', `Message sent and status updated for ${target}`);
} catch (err) { } catch (err) {
if (err instanceof mailers.SendConfigurationError) { if (err instanceof mailers.SendConfigurationError) {
log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}. Will retry the message if within retention interval.`); log.error('Senders', `Sending message to ${target} failed with error: ${err.message}. Will retry the message if within retention interval.`);
withErrors = true; withErrors = true;
break; break;
} else { } else {
log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}. Dropping the message.`); log.error('Senders', `Sending message to ${target} failed with error: ${err.message}. Dropping the message.`);
log.verbose(err.stack); log.verbose(err.stack);
try {
await messageSender.dropQueuedMessage(queuedMessage);
} catch (err) {
log.error(err.stack);
}
} }
} }
} }