From 450b930cc5c43b7bf33fd578494aa31b40c2973c Mon Sep 17 00:00:00 2001 From: Tomas Bures Date: Sun, 30 Jun 2019 10:47:09 +0200 Subject: [PATCH] Work in progress on refactoring all mail sending to use the message sender an sender workers. Some fixes related to subscriptions and password reset. --- client/src/login/Reset.js | 2 +- server/lib/message-sender.js | 18 +++++--- server/lib/subscription-mail-helpers.js | 40 ++++-------------- server/lib/tools.js | 5 +-- server/models/users.js | 7 ++-- server/services/feedcheck.js | 1 - server/services/sender-master.js | 55 ++++++++++++++++--------- server/services/sender-worker.js | 43 ++++++++++++++----- 8 files changed, 95 insertions(+), 76 deletions(-) diff --git a/client/src/login/Reset.js b/client/src/login/Reset.js index f9fe5b6c..0743ca0d 100644 --- a/client/src/login/Reset.js +++ b/client/src/login/Reset.js @@ -49,7 +49,7 @@ export default class Account extends Component { } submitFormValuesMutator(data) { - return filterData(data, ['password']); + return filterData(data, ['username', 'password', 'resetToken']); } @withAsyncErrorHandler diff --git a/server/lib/message-sender.js b/server/lib/message-sender.js index f666288e..c19448a2 100644 --- a/server/lib/message-sender.js +++ b/server/lib/message-sender.js @@ -22,6 +22,7 @@ const {getPublicUrl} = require('./urls'); const blacklist = require('../models/blacklist'); const libmime = require('libmime'); const { enforce } = require('./helpers'); +const senders = require('./senders'); const MessageType = { REGULAR: 0, @@ -95,9 +96,6 @@ class MessageSender { this.listsByCid.set(list.cid, list); this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); } - - } else { - enforce(false); } if (settings.attachments) { @@ -119,7 +117,7 @@ class MessageSender { } if (settings.renderedHtml !== undefined) { - this.rendereHtml = settings.rendereHtml; + this.renderedHtml = settings.renderedHtml; this.renderedText = settings.renderedText; } else if (settings.html !== undefined) { @@ -146,6 +144,7 @@ class MessageSender { let renderTags = false; const campaign = this.campaign; + if (this.renderedHtml !== undefined) { html = this.renderedHtml; text = this.renderedText; @@ -497,6 +496,12 @@ class MessageSender { } } +async function dropQueuedMessage(queuedMessage) { + await knex('queued') + .where({id: queuedMessage.id}) + .del(); +} + async function sendQueuedMessage(queuedMessage) { const msgData = queuedMessage.data; @@ -574,11 +579,13 @@ async function queueSubscriptionMessage(sendConfigurationId, to, subject, encryp encryptionKeys }; - await tx('queued').insert({ + await knex('queued').insert({ send_configuration: sendConfigurationId, type: MessageType.SUBSCRIPTION, data: JSON.stringify(msgData) }); + + senders.scheduleCheck(); } module.exports.MessageSender = MessageSender; @@ -586,3 +593,4 @@ module.exports.MessageType = MessageType; module.exports.sendQueuedMessage = sendQueuedMessage; module.exports.queueCampaignMessageTx = queueCampaignMessageTx; module.exports.queueSubscriptionMessage = queueSubscriptionMessage; +module.exports.dropQueuedMessage = dropQueuedMessage; \ No newline at end of file diff --git a/server/lib/subscription-mail-helpers.js b/server/lib/subscription-mail-helpers.js index 9ffa161a..1c057243 100644 --- a/server/lib/subscription-mail-helpers.js +++ b/server/lib/subscription-mail-helpers.js @@ -9,6 +9,7 @@ const contextHelpers = require('./context-helpers'); const {getFieldColumn} = require('../../shared/lists'); const forms = require('../models/forms'); const messageSender = require('./message-sender'); +const tools = require('./tools'); module.exports = { sendAlreadySubscribed, @@ -64,37 +65,12 @@ async function sendUnsubscriptionConfirmed(locale, list, email, subscription) { await _sendMail(list, email, 'unsubscription_confirmed', locale, tMark('listUnsubscriptionConfirmed'), relativeUrls, subscription); } -function getDisplayName(flds, subscription) { - let firstName, lastName, name; - - for (const fld of flds) { - if (fld.key === 'FIRST_NAME') { - firstName = subscription[fld.column]; - } - - if (fld.key === 'LAST_NAME') { - lastName = subscription[fld.column]; - } - - if (fld.key === 'NAME') { - name = subscription[fld.column]; - } - } - - if (name) { - return name; - } else if (firstName && lastName) { - return firstName + ' ' + lastName; - } else if (lastName) { - return lastName; - } else if (firstName) { - return firstName; - } else { - return ''; - } -} - async function _sendMail(list, email, template, locale, subjectKey, relativeUrls, subscription) { + subscription = { + ...subscription, + email + }; + const flds = await fields.list(contextHelpers.getAdminContext(), list.id); const encryptionKeys = []; @@ -136,11 +112,13 @@ async function _sendMail(list, email, template, locale, subjectKey, relativeUrls } try { + const mergeTags = fields.getMergeTags(flds, subscription); + if (list.send_configuration) { await messageSender.queueSubscriptionMessage( list.send_configuration, { - name: getDisplayName(flds, subscription), + name: list.to_name === null ? undefined : tools.formatTemplate(list.to_name, {}, mergeTags, false), address: email }, tUI(subjectKey, locale, { list: list.name }), diff --git a/server/lib/tools.js b/server/lib/tools.js index 3e6f0888..66c64647 100644 --- a/server/lib/tools.js +++ b/server/lib/tools.js @@ -1,12 +1,9 @@ 'use strict'; -const util = require('util'); const isemail = require('isemail'); const path = require('path'); const {getPublicUrl} = require('./urls'); - -const bluebird = require('bluebird'); - +const {enforce} = require('./helpers'); const hasher = require('node-object-hash')(); const mjml2html = require('mjml'); diff --git a/server/models/users.js b/server/models/users.js index b35ceaae..5ffea2f8 100644 --- a/server/models/users.js +++ b/server/models/users.js @@ -125,8 +125,6 @@ async function listDTAjax(context, params) { async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) { enforce(await tools.validateEmail(entity.email) === 0, 'Invalid email'); - await namespaceHelpers.validateEntity(tx, entity); - const otherUserWithSameEmailQuery = tx('users').where('email', entity.email); if (entity.id) { otherUserWithSameEmailQuery.andWhereNot('id', entity.id); @@ -138,6 +136,9 @@ async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) { if (!isOwnAccount) { + await namespaceHelpers.validateEntity(tx, entity); + enforce(entity.role in config.roles.global, 'Unknown role'); + const otherUserWithSameUsernameQuery = tx('users').where('username', entity.username); if (!isCreate) { otherUserWithSameUsernameQuery.andWhereNot('id', entity.id); @@ -148,8 +149,6 @@ async function _validateAndPreprocess(tx, entity, isCreate, isOwnAccount) { } } - enforce(entity.role in config.roles.global, 'Unknown role'); - enforce(!isCreate || entity.password.length > 0, 'Password not set'); if (entity.password) { diff --git a/server/services/feedcheck.js b/server/services/feedcheck.js index ed23ee5e..e42d6e49 100644 --- a/server/services/feedcheck.js +++ b/server/services/feedcheck.js @@ -5,7 +5,6 @@ const log = require('../lib/log'); const knex = require('../lib/knex'); const feedparser = require('feedparser-promised'); const { CampaignType, CampaignStatus, CampaignSource } = require('../../shared/campaigns'); -const util = require('util'); const campaigns = require('../models/campaigns'); const contextHelpers = require('../lib/context-helpers'); require('../lib/fork'); diff --git a/server/services/sender-master.js b/server/services/sender-master.js index 36fc5b8b..49215a9c 100644 --- a/server/services/sender-master.js +++ b/server/services/sender-master.js @@ -175,6 +175,10 @@ async function workersLoop() { return idleWorkers.shift(); } + function cancelWorker(workerId) { + idleWorkers.push(workerId); + } + function selectNextTask() { const allocationMap = new Map(); const allocation = []; @@ -251,11 +255,10 @@ async function workersLoop() { while (true) { + const workerId = await getAvailableWorker(); const task = selectNextTask(); if (task) { - const workerId = await getAvailableWorker(); - const attrName = task.attrName; const sendConfigurationId = task.sendConfigurationId; const sendConfigurationStatus = getSendConfigurationStatus(sendConfigurationId); @@ -280,7 +283,9 @@ async function workersLoop() { [attrName]: task.id, messages }); + } else { + cancelWorker(workerId); await notifier.waitFor('workAvailable'); } } @@ -394,7 +399,7 @@ async function scheduleCampaigns() { campaignSchedulerRunning = true; try { - // finish old campaigns + // Finish old campaigns const nowDate = new Date(); const now = nowDate.valueOf(); @@ -405,6 +410,22 @@ async function scheduleCampaigns() { .where('campaigns.start_at', '<', expirationThreshold) .update({status: CampaignStatus.FINISHED}); + // Empty message queues for PAUSING campaigns. A pausing campaign typically waits for campaignMessageQueueEmpty before it can check for PAUSING + // We speed this up by discarding messages in the message queue of the campaign. + const pausingCampaigns = await knex('campaigns') + .whereIn('campaigns.type', [CampaignType.REGULAR, CampaignType.RSS_ENTRY]) + .where('campaigns.status', CampaignStatus.PAUSING) + .select(['id']) + .forUpdate(); + + for (const cpg of pausingCampaigns) { + const campaignId = cpg.id; + const queue = campaignMessageQueue.get(campaignId); + queue.splice(0); + notifier.notify(`campaignMessageQueueEmpty:${campaignId}`); + } + + while (true) { let campaignId = 0; const postponedSendConfigurationIds = getPostponedSendConfigurationIds(); @@ -513,30 +534,26 @@ async function processQueuedBySendConfiguration(sendConfigurationId) { const expirationThresholds = getExpirationThresholds(); const expirationCounters = {}; - for (const type of Object.keys(expirationThresholds)) { + for (const type in expirationThresholds) { expirationCounters[type] = 0; } for (const row of rows) { - for (const type of Object.keys(expirationThresholds)) { - if (row.type === type) { - const expirationThreshold = expirationThresholds[type]; + const expirationThreshold = expirationThresholds[row.type]; - if (row.created < expirationThreshold.threshold) { - expirationCounters[type] += 1; - await knex('queued').where('id', row.id).del(); + if (row.created < expirationThreshold.threshold) { + expirationCounters[row.type] += 1; + await knex('queued').where('id', row.id).del(); - } else { - row.data = JSON.parse(row.data); - msgQueue.push({ - queuedMessage: row - }); - } - } + } else { + row.data = JSON.parse(row.data); + msgQueue.push({ + queuedMessage: row + }); } } - for (const type of Object.keys(expirationThresholds)) { + for (const type in expirationThresholds) { const expirationThreshold = expirationThresholds[type]; if (expirationCounters[type] > 0) { log.warn('Senders', `Discarded ${expirationCounters[type]} expired ${expirationThreshold.title} message(s).`); @@ -568,7 +585,7 @@ async function scheduleQueued() { // prune old messages const expirationThresholds = getExpirationThresholds(); - for (const type of Object.keys(expirationThresholds)) { + for (const type in expirationThresholds) { const expirationThreshold = expirationThresholds[type]; const expiredCount = await knex('queued') diff --git a/server/services/sender-worker.js b/server/services/sender-worker.js index ff830adb..447068ec 100644 --- a/server/services/sender-worker.js +++ b/server/services/sender-worker.js @@ -17,25 +17,25 @@ async function processCampaignMessages(campaignId, messages) { running = true; - const cs = new MessageSender(); + const cs = new messageSender.MessageSender(); await cs.initByCampaignId(campaignId); let withErrors = false; - for (const msgData of messages) { + for (const msg of messages) { try { - await cs.sendRegularMessage(msgData.listId, msgData.email); + await cs.sendRegularMessage(msg.listId, msg.email); - log.verbose('Senders', 'Message sent and status updated for %s:%s', msgData.listId, msgData.email); + log.verbose('Senders', 'Message sent and status updated for %s:%s', msg.listId, msg.email); } catch (err) { if (err instanceof mailers.SendConfigurationError) { - log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}. Will retry the message if within retention interval.`); + log.error('Senders', `Sending message to ${msg.listId}:${msg.email} failed with error: ${err.message}. Will retry the message if within retention interval.`); withErrors = true; break; } else { - log.error('Senders', `Sending message to ${msgData.listId}:${msgData.email} failed with error: ${err.message}. Dropping the message.`); + log.error('Senders', `Sending message to ${msg.listId}:${msg.email} failed with error: ${err.message}.`); log.verbose(err.stack); } } @@ -56,19 +56,40 @@ async function processQueuedMessages(sendConfigurationId, messages) { let withErrors = false; - for (const msgData of messages) { - const queuedMessage = msgData.queuedMessage; + for (const msg of messages) { + const queuedMessage = msg.queuedMessage; + + const msgData = queuedMessage.data; + let target = ''; + if (msgData.listId && msgData.subscriptionId) { + target = `${msgData.listId}:${msgData.subscriptionId}`; + } else if (msgData.to) { + if (msgData.to.name && msgData.to.address) { + target = `${msgData.to.name} <${msgData.to.address}>`; + } else if (msgData.to.address) { + target = msgData.to.address; + } else { + target = msgData.to.toString(); + } + } + try { await messageSender.sendQueuedMessage(queuedMessage); - log.verbose('Senders', 'Message sent and status updated for %s:%s', queuedMessage.list, queuedMessage.subscription); + log.verbose('Senders', `Message sent and status updated for ${target}`); } catch (err) { if (err instanceof mailers.SendConfigurationError) { - log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}. Will retry the message if within retention interval.`); + log.error('Senders', `Sending message to ${target} failed with error: ${err.message}. Will retry the message if within retention interval.`); withErrors = true; break; } else { - log.error('Senders', `Sending message to ${queuedMessage.list}:${queuedMessage.subscription} failed with error: ${err.message}. Dropping the message.`); + log.error('Senders', `Sending message to ${target} failed with error: ${err.message}. Dropping the message.`); log.verbose(err.stack); + + try { + await messageSender.dropQueuedMessage(queuedMessage); + } catch (err) { + log.error(err.stack); + } } } }