diff --git a/client/src/lists/subscriptions/CUD.js b/client/src/lists/subscriptions/CUD.js index 54456e67..a14ccd1b 100644 --- a/client/src/lists/subscriptions/CUD.js +++ b/client/src/lists/subscriptions/CUD.js @@ -51,6 +51,11 @@ export default class CUD extends Component { extra: ['id'] }, }); + + this.timezoneOptions = [ + { key: '', label: t('notSelected') }, + ...moment.tz.names().map(tz => ({ key: tz.toLowerCase(), label: tz })) + ]; } static propTypes = { @@ -185,11 +190,6 @@ export default class CUD extends Component { const statusOptions = Object.keys(this.subscriptionStatusLabels) .map(key => ({key, label: this.subscriptionStatusLabels[key]})); - const tzOptions = [ - { key: '', label: t('notSelected') }, - ...moment.tz.names().map(tz => ({ key: tz.toLowerCase(), label: tz })) - ]; - const customFields = []; for (const fld of this.props.fieldsGrouped) { customFields.push(this.fieldTypes[fld.type].form(fld)); @@ -219,10 +219,9 @@ export default class CUD extends Component { {customFields} -
- + diff --git a/server/config/default.yaml b/server/config/default.yaml index 127223c6..ce7bb86c 100644 --- a/server/config/default.yaml +++ b/server/config/default.yaml @@ -184,6 +184,8 @@ queue: test: 300 # 5 minutes # Subscription and password reset related emails subscription: 300 # 5 minutes + # Transactional emails sent via API (i.e. /templates/:templateId/send) + apiTransactional: 3600 # 60 minutes cors: # Allow subscription widgets to be embedded diff --git a/server/lib/message-sender.js b/server/lib/message-sender.js index c01000f8..0dd96530 100644 --- a/server/lib/message-sender.js +++ b/server/lib/message-sender.js @@ -13,7 +13,8 @@ const fields = require('../models/fields'); const sendConfigurations = require('../models/send-configurations'); const links = require('../models/links'); const {CampaignSource, CampaignType} = require('../../shared/campaigns'); -const {SubscriptionStatus, toNameTagLangauge} = require('../../shared/lists'); +const {toNameTagLangauge} = require('../../shared/lists'); +const {CampaignMessageStatus} = require('../../shared/campaigns'); const tools = require('./tools'); const htmlToText = require('html-to-text'); const request = require('request-promise'); @@ -28,7 +29,9 @@ const MessageType = { REGULAR: 0, TRIGGERED: 1, TEST: 2, - SUBSCRIPTION: 3 + SUBSCRIPTION: 3, + API_TRANSACTIONAL: 4 + }; class MessageSender { @@ -36,10 +39,23 @@ class MessageSender { } /* - settings is one of: + Accepted combinations of settings: + + Option #1 + - settings.type in [MessageType.REGULAR, MessageType.TRIGGERED, MessageType.TEST] - campaignCid / campaignId - or - - sendConfiguration, listId, attachments, html, text, subject, tagLanguage + - listId / listCid [optional if campaign is provided] + - sendConfigurationId [optional if campaign is provided] + - attachments [optional] + - renderedHtml + renderedText / html + text + tagLanguage [optional if campaign is provided] + - subject [optional if campaign is provided] + + Option #2 + - settings.type in [MessageType.SUBSCRIPTION, MessageType.API_TRANSACTIONAL] + - sendConfigurationId + - attachments [optional] + - renderedHtml + renderedText / html + text + tagLanguage + - subject */ async _init(settings) { this.type = settings.type; @@ -49,52 +65,56 @@ class MessageSender { this.listsFieldsGrouped = new Map(); // listId -> fieldsGrouped await knex.transaction(async tx => { - if (settings.campaignCid) { - this.campaign = await campaigns.rawGetByTx(tx, 'cid', settings.campaignCid); + if (this.type === MessageType.REGULAR || this.type === MessageType.TRIGGERED || this.type === MessageType.TEST) { this.isMassMail = true; - } else if (settings.campaignId) { - this.campaign = await campaigns.rawGetByTx(tx, 'id', settings.campaignId); - this.isMassMail = true; + if (settings.campaignCid) { + this.campaign = await campaigns.rawGetByTx(tx, 'cid', settings.campaignCid); + } else if (settings.campaignId) { + this.campaign = await campaigns.rawGetByTx(tx, 'id', settings.campaignId); + } - } else if (this.type === MessageType.TEST) { - this.isMassMail = true; + if (settings.sendConfigurationId) { + this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), settings.sendConfigurationId, false, true); + } else if (this.campaign && this.campaign.send_configuration) { + this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.send_configuration, false, true); + } else { + enforce(false); + } - } else { + this.useVerp = config.verp.enabled && this.sendConfiguration.verp_hostname; + this.useVerpSenderHeader = this.useVerp && !this.sendConfiguration.verp_disable_sender_header; + + + if (settings.listId) { + const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), settings.listId); + this.listsById.set(list.id, list); + this.listsByCid.set(list.cid, list); + this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); + + } else if (settings.listCid) { + const list = await lists.getByCidTx(tx, contextHelpers.getAdminContext(), settings.listCid); + this.listsById.set(list.id, list); + this.listsByCid.set(list.cid, list); + this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); + + } else if (this.campaign && this.campaign.lists) { + for (const listSpec of this.campaign.lists) { + const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), listSpec.list); + this.listsById.set(list.id, list); + this.listsByCid.set(list.cid, list); + this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); + } + } + + } else if (this.type === MessageType.SUBSCRIPTION || this.type === MessageType.API_TRANSACTIONAL) { this.isMassMail = false; - } - - if (settings.sendConfigurationId) { this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), settings.sendConfigurationId, false, true); - } else if (this.campaign && this.campaign.send_configuration) { - this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.send_configuration, false, true); + } else { enforce(false); } - this.useVerp = config.verp.enabled && this.sendConfiguration.verp_hostname; - this.useVerpSenderHeader = this.useVerp && !this.sendConfiguration.verp_disable_sender_header; - - if (settings.listId) { - const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), settings.listId); - this.listsById.set(list.id, list); - this.listsByCid.set(list.cid, list); - this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); - - } else if (settings.listCid) { - const list = await lists.getByCidTx(tx, contextHelpers.getAdminContext(), settings.listCid); - this.listsById.set(list.id, list); - this.listsByCid.set(list.cid, list); - this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); - - } else if (this.campaign && this.campaign.lists) { - for (const listSpec of this.campaign.lists) { - const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), listSpec.list); - this.listsById.set(list.id, list); - this.listsByCid.set(list.cid, list); - this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id)); - } - } if (settings.attachments) { this.attachments = settings.attachments; @@ -147,7 +167,7 @@ class MessageSender { }); } - async _getMessage(list, subscriptionGrouped, mergeTags, replaceDataImgs) { + async _getMessage(mergeTags, list, subscriptionGrouped, replaceDataImgs) { let html = ''; let text = ''; let renderTags = false; @@ -160,11 +180,15 @@ class MessageSender { renderTags = false; } else if (this.html !== undefined) { + enforce(mergeTags); + html = this.html; text = this.text; renderTags = true; } else if (campaign && campaign.source === CampaignSource.URL) { + const mergeTags = subData.mergeTags; + const form = tools.getMessageLinks(campaign, list, subscriptionGrouped); for (const key in mergeTags) { form[key] = mergeTags[key]; @@ -204,6 +228,7 @@ class MessageSender { html = await links.updateLinks(html, this.tagLanguage, mergeTags, campaign, list, subscriptionGrouped); } + // When no list and subscriptionGrouped is provided, formatCampaignTemplate works the same way as formatTemplate html = tools.formatCampaignTemplate(html, this.tagLanguage, mergeTags, true, campaign, list, subscriptionGrouped); } @@ -211,9 +236,8 @@ class MessageSender { if (generateText) { text = htmlToText.fromString(html, {wordwrap: 130}); } else { - if (renderTags) { - text = tools.formatCampaignTemplate(text, this.tagLanguage, mergeTags, false, campaign, list, subscriptionGrouped) - } + // When no list and subscriptionGrouped is provided, formatCampaignTemplate works the same way as formatTemplate + text = tools.formatCampaignTemplate(text, this.tagLanguage, mergeTags, false, campaign, list, subscriptionGrouped) } return { @@ -246,15 +270,19 @@ class MessageSender { /* - subData is one of: - - subscriptionId, listId, attachments - or - - email, listId - or - - to, subject + Accepted combinations of subData: + + Option #1 + - listId + - subscriptionId / email + - mergeTags [optional, used only when campaign / html+text is provided] + + Option #2: + - to ... email / { name, address } + - encryptionKeys [optional] + - mergeTags [used only when campaign / html+text is provided] */ async _sendMessage(subData) { - let msgType = this.type; let to, email; let envelope = false; let sender = false; @@ -267,9 +295,10 @@ class MessageSender { let subscriptionGrouped, list; // May be undefined const campaign = this.campaign; // May be undefined + let mergeTags = subData.mergeTags; + if (subData.listId) { let listId; - subscriptionGrouped; if (subData.subscriptionId) { listId = subData.listId; @@ -284,7 +313,10 @@ class MessageSender { email = subscriptionGrouped.email; const flds = this.listsFieldsGrouped.get(list.id); - const mergeTags = fields.getMergeTags(flds, subscriptionGrouped, this._getExtraTags(campaign)); + + if (!mergeTags) { + mergeTags = fields.getMergeTags(flds, subscriptionGrouped, this._getExtraTags(campaign)); + } for (const fld of flds) { if (fld.type === 'gpg' && mergeTags[fld.key]) { @@ -292,7 +324,7 @@ class MessageSender { } } - message = await this._getMessage(list, subscriptionGrouped, mergeTags, true); + message = await this._getMessage(mergeTags, list, subscriptionGrouped, true); let listUnsubscribe = null; if (!list.listunsubscribe_disabled) { @@ -356,7 +388,7 @@ class MessageSender { email = to.address; subject = this.subject; encryptionKeys = subData.encryptionKeys; - message = await this._getMessage(); + message = await this._getMessage(mergeTags); } if (await blacklist.isBlacklisted(email)) { @@ -391,7 +423,7 @@ class MessageSender { subject, html: message.html, text: message.text, - attachments: message.attachments || [], + attachments: message.attachments, encryptionKeys }; @@ -440,92 +472,72 @@ class MessageSender { } - if (msgType === MessageType.REGULAR || msgType === MessageType.TRIGGERED) { - await knex('campaigns').where('id', campaign.id).increment('delivered'); + const result = { + response, + response_id: responseId, + list, + subscriptionGrouped, + email + }; + + return result; + } + + async sendRegularCampaignMessage(listId, email) { + enforce(this.type === MessageType.REGULAR); + + // We insert into campaign_messages before the message is actually sent. This is to avoid multiple delivery + // if by chance we run out of disk space and couldn't insert in the database after the message has been sent out + const ids = await knex('campaign_messages').insert({ + campaign: this.campaign.id, + list: result.list.id, + subscription: result.subscriptionGrouped.id, + send_configuration: this.sendConfiguration.id, + status: CampaignMessageStatus.SENDING + }); + + const campaignMessageId = ids[0]; + + let result; + try { + result = await this._sendMessage({listId, email}); + } catch (err) { + await knex('campaign_messages') + .where({id: campaignMessageId}) + .del(); + + throw err; } + enforce(result.list); + enforce(result.subscriptionGrouped); const now = new Date(); - if (msgType === MessageType.REGULAR) { - enforce(list); - enforce(subscriptionGrouped); - - await knex('campaign_messages').insert({ - campaign: campaign.id, - list: list.id, - subscription: subscriptionGrouped.id, - send_configuration: sendConfiguration.id, - status: SubscriptionStatus.SUBSCRIBED, - response, - response_id: responseId, + await knex('campaign_messages') + .where({id: campaignMessageId}) + .update({ + status: CampaignMessageStatus.SENT, + response: result.response, + response_id: result.responseId, updated: now }); - } - - if (campaign && msgType === MessageType.TEST) { - enforce(list); - enforce(subscriptionGrouped); - - try { - // Insert an entry to test_messages. This allows us to remember test sends to lists that are not - // listed in the campaign - see the check in getMessage - await knex('test_messages').insert({ - campaign: campaign.id, - list: list.id, - subscription: subscriptionGrouped.id - }); - } catch (err) { - if (err.code === 'ER_DUP_ENTRY') { - // The entry is already there, so we can ignore this error - } else { - throw err; - } - } - } - - if (msgType === MessageType.TRIGGERED || msgType === MessageType.TEST || msgType === MessageType.SUBSCRIPTION) { - - if (subData.attachments) { - for (const attachment of subData.attachments) { - try { - // We ignore any errors here because we already sent the message. Thus we have to mark it as completed to avoid sending it again. - await knex.transaction(async tx => { - await files.unlockTx(tx, 'campaign', 'attachment', attachment.id); - }); - } catch (err) { - log.error('MessageSender', `Error when unlocking attachment ${attachment.id} for ${email} (queuedId: ${subData.queuedId})`); - log.verbose(err.stack); - } - } - } - - await knex('queued') - .where({id: subData.queuedId}) - .del(); - } - } - - async sendRegularMessage(listId, email) { - enforce(this.type === MessageType.REGULAR); - - await this._sendMessage({listId, email}); + await knex('campaigns').where('id', this.campaign.id).increment('delivered'); } } -async function dropQueuedMessage(queuedMessage) { - await knex('queued') - .where({id: queuedMessage.id}) - .del(); -} async function sendQueuedMessage(queuedMessage) { + const messageType = queuedMessage.type; + + enforce(messageType === MessageType.TRIGGERED || messageType === MessageType.TEST || messageType === MessageType.SUBSCRIPTION || messageType === MessageType.API_TRANSACTIONAL); + const msgData = queuedMessage.data; const cs = new MessageSender(); await cs._init({ - type: queuedMessage.type, + type: messageType, campaignId: msgData.campaignId, listId: msgData.listId, sendConfigurationId: queuedMessage.send_configuration, @@ -538,16 +550,79 @@ async function sendQueuedMessage(queuedMessage) { renderedText: msgData.renderedText }); - await cs._sendMessage({ - subscriptionId: msgData.subscriptionId, - listId: msgData.listId, - to: msgData.to, - attachments: msgData.attachments, - encryptionKeys: msgData.encryptionKeys, - queuedId: queuedMessage.id - }); + const campaign = cs.campaign; + + await knex('queued') + .where({id: queuedMessage.id}) + .del(); + + let result; + try { + result = await cs._sendMessage({ + subscriptionId: msgData.subscriptionId, + listId: msgData.listId, + to: msgData.to, + encryptionKeys: msgData.encryptionKeys + }); + } catch (err) { + await knex.insert({ + id: queuedMessage.id, + send_configuration: queuedMessage.send_configuration, + type: queuedMessage.type, + data: JSON.stringify(queuedMessage.data) + }); + + throw err; + } + + if (messageType === MessageType.TRIGGERED) { + await knex('campaigns').where('id', campaign.id).increment('delivered'); + } + + if (campaign && messageType === MessageType.TEST) { + enforce(result.list); + enforce(result.subscriptionGrouped); + + try { + // Insert an entry to test_messages. This allows us to remember test sends to lists that are not + // listed in the campaign - see the check in getMessage + await knex('test_messages').insert({ + campaign: campaign.id, + list: result.list.id, + subscription: result.subscriptionGrouped.id + }); + } catch (err) { + if (err.code === 'ER_DUP_ENTRY') { + // The entry is already there, so we can ignore this error + } else { + throw err; + } + } + } + + for (const attachment of msgData.attachments) { + if (attachment.id) { // This means that it is an attachment recorded in table files_campaign_attachment + try { + // We ignore any errors here because we already sent the message. Thus we have to mark it as completed to avoid sending it again. + await knex.transaction(async tx => { + await files.unlockTx(tx, 'campaign', 'attachment', attachment.id); + }); + } catch (err) { + log.error('MessageSender', `Error when unlocking attachment ${attachment.id} for ${result.email} (queuedId: ${queuedMessage.id})`); + log.verbose(err.stack); + } + } + } } +async function dropQueuedMessage(queuedMessage) { + await knex('queued') + .where({id: queuedMessage.id}) + .del(); +} + + + async function queueCampaignMessageTx(tx, sendConfigurationId, listId, subscriptionId, messageType, messageData) { enforce(messageType === MessageType.TRIGGERED || messageType === MessageType.TEST); @@ -569,6 +644,26 @@ async function queueCampaignMessageTx(tx, sendConfigurationId, listId, subscript }); } +async function queueAPITransactionalMessage(sendConfigurationId, email, subject, html, text, tagLanguage, mergeTags, attachments) { + const msgData = { + to: { + address: email + }, + html, + text, + tagLanguage, + subject, + mergeTags, + attachments + }; + + await tx('queued').insert({ + send_configuration: sendConfigurationId, + type: MessageType.API_TRANSACTIONAL, + data: JSON.stringify(msgData) + }); +} + async function queueSubscriptionMessage(sendConfigurationId, to, subject, encryptionKeys, template) { let html, text; @@ -645,7 +740,7 @@ async function getMessage(campaignCid, listCid, subscriptionCid) { const flds = cs.listsFieldsGrouped.get(list.id); const mergeTags = fields.getMergeTags(flds, subscriptionGrouped, cs._getExtraTags(campaign)); - return await cs._getMessage(list, subscriptionGrouped, mergeTags, false); + return await cs._getMessage(mergeTags, list, subscriptionGrouped, false); } module.exports.MessageSender = MessageSender; @@ -654,4 +749,5 @@ module.exports.sendQueuedMessage = sendQueuedMessage; module.exports.queueCampaignMessageTx = queueCampaignMessageTx; module.exports.queueSubscriptionMessage = queueSubscriptionMessage; module.exports.dropQueuedMessage = dropQueuedMessage; -module.exports.getMessage = getMessage; \ No newline at end of file +module.exports.getMessage = getMessage; +module.exports.queueAPITransactionalMessage = queueAPITransactionalMessage; \ No newline at end of file diff --git a/server/models/campaigns.js b/server/models/campaigns.js index 00d28798..70061aef 100644 --- a/server/models/campaigns.js +++ b/server/models/campaigns.js @@ -11,7 +11,7 @@ const namespaceHelpers = require('../lib/namespace-helpers'); const files = require('./files'); const templates = require('./templates'); const { allTagLanguages } = require('../../shared/templates'); -const { CampaignStatus, CampaignSource, CampaignType, getSendConfigurationPermissionRequiredForSend } = require('../../shared/campaigns'); +const { CampaignMessageStatus, CampaignStatus, CampaignSource, CampaignType, getSendConfigurationPermissionRequiredForSend } = require('../../shared/campaigns'); const sendConfigurations = require('./send-configurations'); const triggers = require('./triggers'); const {SubscriptionStatus} = require('../../shared/lists'); @@ -724,36 +724,31 @@ async function getMessageByResponseId(responseId) { .first(); } -const statusFieldMapping = { - [SubscriptionStatus.UNSUBSCRIBED]: 'unsubscribed', - [SubscriptionStatus.BOUNCED]: 'bounced', - [SubscriptionStatus.COMPLAINED]: 'complained' -}; +const statusFieldMapping = new Map(); +statusFieldMapping.set(CampaignMessageStatus.UNSUBSCRIBED, 'unsubscribed'); +statusFieldMapping.set(CampaignMessageStatus.BOUNCED, 'bounced'); +statusFieldMapping.set(CampaignMessageStatus.COMPLAINED, 'complained'); -async function _changeStatusByMessageTx(tx, context, message, subscriptionStatus) { - enforce(subscriptionStatus !== SubscriptionStatus.SUBSCRIBED); +async function _changeStatusByMessageTx(tx, context, message, campaignMessageStatus) { + enforce(statusFieldMapping.has(campaignMessageStatus)); - if (message.status === SubscriptionStatus.SUBSCRIBED) { + if (message.status === SubscriptionStatus.SENT) { await shares.enforceEntityPermissionTx(tx, context, 'campaign', message.campaign, 'manageMessages'); - if (!subscriptionStatus in statusFieldMapping) { - throw new Error('Unrecognized message status'); - } - - const statusField = statusFieldMapping[subscriptionStatus]; + const statusField = statusFieldMapping.get(campaignMessageStatus); await tx('campaigns').increment(statusField, 1).where('id', message.campaign); await tx('campaign_messages') .where('id', message.id) .update({ - status: subscriptionStatus, + status: campaignMessageStatus, updated: knex.fn.now() }); } } -async function changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaignCid, listId, subscriptionId, subscriptionStatus) { +async function changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaignCid, listId, subscriptionId, campaignMessageStatus) { const message = await tx('campaign_messages') .innerJoin('campaigns', 'campaign_messages.campaign', 'campaigns.id') .where('campaigns.cid', campaignCid) @@ -767,17 +762,23 @@ async function changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaig throw new Error('Invalid campaign.'); } - await _changeStatusByMessageTx(tx, context, message, subscriptionStatus); + await _changeStatusByMessageTx(tx, context, message, campaignMessageStatus); } -async function changeStatusByMessage(context, message, subscriptionStatus, updateSubscription) { + +const campaignMessageStatusToSubscriptionStatusMapping = new Map(); +campaignMessageStatusToSubscriptionStatusMapping.set(CampaignMessageStatus.BOUNCED, SubscriptionStatus.BOUNCED); +campaignMessageStatusToSubscriptionStatusMapping.set(CampaignMessageStatus.UNSUBSCRIBED, SubscriptionStatus.UNSUBSCRIBED); +campaignMessageStatusToSubscriptionStatusMapping.set(CampaignMessageStatus.COMPLAINED, SubscriptionStatus.COMPLAINED); + +async function changeStatusByMessage(context, message, campaignMessageStatus, updateSubscription) { await knex.transaction(async tx => { if (updateSubscription) { - await subscriptions.changeStatusTx(tx, context, message.list, message.subscription, subscriptionStatus); + enforce(campaignMessageStatusToSubscriptionStatusMapping.has(campaignMessageStatus)); + await subscriptions.changeStatusTx(tx, context, message.list, message.subscription, campaignMessageStatusToSubscriptionStatusMapping.get(campaignMessageStatus)); } - await _changeStatusByMessageTx(tx, context, message, subscriptionStatus); - + await _changeStatusByMessageTx(tx, context, message, campaignMessageStatus); }); } diff --git a/server/models/subscriptions.js b/server/models/subscriptions.js index a2f0ba1d..6e01e57d 100644 --- a/server/models/subscriptions.js +++ b/server/models/subscriptions.js @@ -9,6 +9,7 @@ const interoperableErrors = require('../../shared/interoperable-errors'); const shares = require('./shares'); const fields = require('./fields'); const { SubscriptionSource, SubscriptionStatus, getFieldColumn } = require('../../shared/lists'); +const { CampaignMessageStatus } = require('../../shared/campaigns'); const segments = require('./segments'); const { enforce, filterObject } = require('../lib/helpers'); const moment = require('moment'); @@ -783,7 +784,7 @@ async function unsubscribeByCidAndGet(context, listId, subscriptionCid, campaign const existing = await tx(getSubscriptionTableName(listId)).where('cid', subscriptionCid).first(); if (campaignCid) { - await campaigns.changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaignCid, listId, existing.id, SubscriptionStatus.UNSUBSCRIBED); + await campaigns.changeStatusByCampaignCidAndSubscriptionIdTx(tx, context, campaignCid, listId, existing.id, CampaignMessageStatus.UNSUBSCRIBED); } return await _unsubscribeExistingAndGetTx(tx, context, listId, existing); diff --git a/server/models/templates.js b/server/models/templates.js index d947ecb1..adccd329 100644 --- a/server/models/templates.js +++ b/server/models/templates.js @@ -10,13 +10,8 @@ const shares = require('./shares'); const files = require('./files'); const dependencyHelpers = require('../lib/dependency-helpers'); const {convertFileURLs} = require('../lib/campaign-content'); - -const mailers = require('../lib/mailers'); -const tools = require('../lib/tools'); -const sendConfigurations = require('./send-configurations'); -const { getMergeTagsForBases, allTagLanguages } = require('../../shared/templates'); -const { getTrustedUrl, getSandboxUrl, getPublicUrl } = require('../lib/urls'); -const htmlToText = require('html-to-text'); +const { allTagLanguages } = require('../../shared/templates'); +const messageSender = require('../lib/message-sender'); const allowedKeys = new Set(['name', 'description', 'type', 'tag_language', 'data', 'html', 'text', 'namespace']); @@ -154,66 +149,14 @@ async function remove(context, id) { }); } -const MAX_EMAIL_COUNT = 100; -async function sendAsTransactionalEmail(context, templateId, sendConfigurationId, emails, subject, mergeTags) { - // TODO - Update this to use MessageSender.queueMessageTx (with renderedHtml and renderedText) +async function sendAsTransactionalEmail(context, templateId, sendConfigurationId, emails, subject, mergeTags, attachments) { + const template = await getById(context, templateId, false); - /* - if (emails.length > MAX_EMAIL_COUNT) { - throw new Error(`Cannot send more than ${MAX_EMAIL_COUNT} emails at once`); + await shares.enforceEntityPermission(context, 'sendConfiguration', sendConfigurationId, 'sendWithoutOverrides'); + + for (const email of emails) { + await messageSender.queueAPITransactionalMessage(sendConfigurationId, email, subject, template.html, template.text, template.tag_language, {...mergeTags, EMAIL: email }, attachments); } - - await knex.transaction(async tx => { - const template = await getByIdTx(tx, context, templateId,false); - const sendConfiguration = await sendConfigurations.getByIdTx(tx, context, sendConfigurationId, false, false); - - await shares.enforceEntityPermissionTx(tx, context, 'sendConfiguration', sendConfigurationId, 'sendWithoutOverrides'); - - const mailer = await mailers.getOrCreateMailer(sendConfigurationId); - - const variablesSkeleton = { - ...getMergeTagsForBases(getTrustedUrl(), getSandboxUrl(), getPublicUrl()), - ...mergeTags - }; - - for (const email of emails) { - const variables = { - ...variablesSkeleton, - EMAIL: email - }; - - const html = tools.formatTemplate( - TODO - tag langauge - template.html, - null, - variables, - true - ); - - const text = (template.text || '').trim() - ? tools.formatTemplate( - TODO - tag langauge - template.text, - null, - variables, - false - ) : htmlToText.fromString(html, {wordwrap: 130}); - - return mailer.sendTransactionalMail( - { - to: email, - subject, - from: { - name: sendConfiguration.from_name, - address: sendConfiguration.from_email - }, - html, - text - } - ); - } - }); - */ } diff --git a/server/routes/api.js b/server/routes/api.js index 06740ec6..4d2b5116 100644 --- a/server/routes/api.js +++ b/server/routes/api.js @@ -310,11 +310,14 @@ router.postAsync('/templates/:templateId/send', async (req, res) => { } const emails = input.EMAIL.split(','); + const mergeTags = input.TAGS || {}; + const subject = input.SUBJECT || ''; + const attachments = input.ATTACHMENTS || []; - // TODO: attachments: input.ATTACHMENTS || [] - const info = await templates.sendAsTransactionalEmail(req.context, templateId, sendConfigurationId, emails, input.SUBJECT, input.VARIABLES); - res.json({ data: info }); + const result = await templates.sendAsTransactionalEmail(req.context, templateId, sendConfigurationId, emails, subject, mergeTags, attachments); + + res.json({ data: result }); }); module.exports = router; diff --git a/server/routes/webhooks.js b/server/routes/webhooks.js index 0304c832..2dcdbe6f 100644 --- a/server/routes/webhooks.js +++ b/server/routes/webhooks.js @@ -5,7 +5,7 @@ const request = require('request-promise'); const campaigns = require('../models/campaigns'); const sendConfigurations = require('../models/send-configurations'); const contextHelpers = require('../lib/context-helpers'); -const {SubscriptionStatus} = require('../../shared/lists'); +const {CampaignMessageStatus} = require('../../shared/campaigns'); const {MailerType} = require('../../shared/send-configurations'); const log = require('../lib/log'); const multer = require('multer'); @@ -44,13 +44,13 @@ router.postAsync('/aws', async (req, res) => { switch (req.body.Message.notificationType) { case 'Bounce': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, req.body.Message.bounce.bounceType === 'Permanent'); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, req.body.Message.bounce.bounceType === 'Permanent'); log.verbose('AWS', 'Marked message %s as bounced', req.body.Message.mail.messageId); break; case 'Complaint': if (req.body.Message.complaint) { - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.COMPLAINED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.COMPLAINED, true); log.verbose('AWS', 'Marked message %s as complaint', req.body.Message.mail.messageId); } break; @@ -93,17 +93,17 @@ router.postAsync('/sparkpost', async (req, res) => { switch (evt.type) { case 'bounce': // https://support.sparkpost.com/customer/portal/articles/1929896 - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, [1, 10, 25, 30, 50].indexOf(Number(evt.bounce_class)) >= 0); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, [1, 10, 25, 30, 50].indexOf(Number(evt.bounce_class)) >= 0); log.verbose('Sparkpost', 'Marked message %s as bounced', evt.campaign_id); break; case 'spam_complaint': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.COMPLAINED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.COMPLAINED, true); log.verbose('Sparkpost', 'Marked message %s as complaint', evt.campaign_id); break; case 'link_unsubscribe': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.UNSUBSCRIBED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.UNSUBSCRIBED, true); log.verbose('Sparkpost', 'Marked message %s as unsubscribed', evt.campaign_id); break; } @@ -134,18 +134,18 @@ router.postAsync('/sendgrid', async (req, res) => { switch (evt.event) { case 'bounce': // https://support.sparkpost.com/customer/portal/articles/1929896 - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, true); log.verbose('Sendgrid', 'Marked message %s as bounced', evt.campaign_id); break; case 'spamreport': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.COMPLAINED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.COMPLAINED, true); log.verbose('Sendgrid', 'Marked message %s as complaint', evt.campaign_id); break; case 'group_unsubscribe': case 'unsubscribe': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.UNSUBSCRIBED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.UNSUBSCRIBED, true); log.verbose('Sendgrid', 'Marked message %s as unsubscribed', evt.campaign_id); break; } @@ -167,17 +167,17 @@ router.postAsync('/mailgun', uploads.any(), async (req, res) => { if (message) { switch (evt.event) { case 'bounced': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, true); log.verbose('Mailgun', 'Marked message %s as bounced', evt.campaign_id); break; case 'complained': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.COMPLAINED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.COMPLAINED, true); log.verbose('Mailgun', 'Marked message %s as complaint', evt.campaign_id); break; case 'unsubscribed': - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.UNSUBSCRIBED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.UNSUBSCRIBED, true); log.verbose('Mailgun', 'Marked message %s as unsubscribed', evt.campaign_id); break; } @@ -199,7 +199,7 @@ router.postAsync('/zone-mta', async (req, res) => { const message = await campaigns.getMessageByResponseId(req.body.id); if (message) { - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, true); log.verbose('ZoneMTA', 'Marked message (campaign:%s, list:%s, subscription:%s) as bounced', message.campaign, message.list, message.subscription); } } @@ -265,7 +265,7 @@ router.postAsync('/postal', async (req, res) => { if (req.body.payload.message && req.body.payload.message.message_id) { const message = await campaigns.getMessageByResponseId(req.body.payload.message.message_id); if (message) { - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, req.body.payload.status === 'HardFail'); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, req.body.payload.status === 'HardFail'); log.verbose('Postal', 'Marked message %s as bounced', req.body.payload.message.message_id); } } @@ -275,7 +275,7 @@ router.postAsync('/postal', async (req, res) => { if (req.body.payload.original_message && req.body.payload.original_message.message_id) { const message = await campaigns.getMessageByResponseId(req.body.payload.original_message.message_id); if (message) { - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, true); log.verbose('Postal', 'Marked message %s as bounced', req.body.payload.original_message.message_id); } } diff --git a/server/services/postfix-bounce-server.js b/server/services/postfix-bounce-server.js index f2596c66..4862ea91 100644 --- a/server/services/postfix-bounce-server.js +++ b/server/services/postfix-bounce-server.js @@ -5,7 +5,7 @@ const config = require('config'); const net = require('net'); const campaigns = require('../models/campaigns'); const contextHelpers = require('../lib/context-helpers'); -const { SubscriptionStatus } = require('../../shared/lists'); +const { CampaignMessageStatus } = require('../../shared/campaigns'); const bluebird = require('bluebird'); const seenIds = new Set(); @@ -66,7 +66,7 @@ async function readNextChunks() { campaigns.updateMessageResponse(contextHelpers.getAdminContext(), message, queued, queuedAs); log.verbose('POSTFIXBOUNCE', 'Successfully changed message queueId to %s', queuedAs); } else { - campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, SubscriptionStatus.BOUNCED, true); + campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), message, CampaignMessageStatus.BOUNCED, true); log.verbose('POSTFIXBOUNCE', 'Marked message %s as bounced', queueId); } diff --git a/server/services/sender-master.js b/server/services/sender-master.js index d092e51c..b61e7c7d 100644 --- a/server/services/sender-master.js +++ b/server/services/sender-master.js @@ -142,6 +142,10 @@ function getExpirationThresholds() { [MessageType.SUBSCRIPTION]: { threshold: now - config.queue.retention.subscription * 1000, title: 'subscription and password-related' + }, + [MessageType.API_TRANSACTIONAL]: { + threshold: now - config.queue.retention.apiTransactional * 1000, + title: 'transactional (API)' } }; } @@ -295,33 +299,35 @@ async function workersLoop() { async function processCampaign(campaignId) { const msgQueue = campaignMessageQueue.get(campaignId); + const isCompleted = () => { + if (msgQueue.length > 0) return false; + + let workerRunning = false; + + for (const wa of workAssignment.values()) { + if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) { + workerRunning = true; + } + } + + return !workerRunning; + }; + async function finish(clearMsgQueue, newStatus) { if (clearMsgQueue) { msgQueue.splice(0); } - const isCompleted = () => { - if (msgQueue.length > 0) return false; - - let workerRunning = false; - - for (const wa of workAssignment.values()) { - if (wa.type === WorkAssignmentType.CAMPAIGN && wa.campaignId === campaignId) { - workerRunning = true; - } - } - - return !workerRunning; - }; - while (!isCompleted()) { await notifier.waitFor('workerFinished'); } - campaignMessageQueue.delete(campaignId); + if (newStatus) { + campaignMessageQueue.delete(campaignId); - await knex('campaigns').where('id', campaignId).update({status: newStatus}); - await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newStatus}); + await knex('campaigns').where('id', campaignId).update({status: newStatus}); + await activityLog.logEntityActivity('campaign', CampaignActivityType.STATUS_CHANGE, campaignId, {status: newStatus}); + } } try { @@ -364,7 +370,16 @@ async function processCampaign(campaignId) { const subs = await qry; if (subs.length === 0) { - return await finish(false, CampaignStatus.FINISHED); + if (isCompleted()) { + return await finish(false, CampaignStatus.FINISHED); + + } else { + await finish(false); + + // At this point, there might be messages that re-appeared because sending failed. + continue; + } + } for (const sub of subs) { @@ -501,7 +516,7 @@ async function processQueuedBySendConfiguration(sendConfigurationId) { try { while (true) { if (isSendConfigurationPostponed(sendConfigurationId)) { - return finish(true, true); + return await finish(true, true); } let messagesInProcessing = [...msgQueue]; @@ -514,18 +529,17 @@ async function processQueuedBySendConfiguration(sendConfigurationId) { const messageIdsInProcessing = messagesInProcessing.map(x => x.queuedMessage.id); const rows = await knex('queued') - .orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST and MessageType.TRIGGERED + .orderByRaw(`FIELD(type, ${MessageType.TRIGGERED}, ${MessageType.API_TRANSACTIONAL}, ${MessageType.TEST}, ${MessageType.SUBSCRIPTION}) DESC, id ASC`) // This orders messages in the following order MessageType.SUBSCRIPTION, MessageType.TEST, MessageType.API_TRANSACTIONAL and MessageType.TRIGGERED .where('send_configuration', sendConfigurationId) .whereNotIn('id', messageIdsInProcessing) .limit(retrieveBatchSize); if (rows.length === 0) { if (isCompleted()) { - sendConfigurationMessageQueue.delete(sendConfigurationId); - return; + return await finish(false, true); } else { - finish(false, false); + await finish(false, false); // At this point, there might be new messages in the queued that could belong to us. Thus we have to try again instead for returning. continue; diff --git a/server/services/sender-worker.js b/server/services/sender-worker.js index 447068ec..dd32adad 100644 --- a/server/services/sender-worker.js +++ b/server/services/sender-worker.js @@ -24,7 +24,7 @@ async function processCampaignMessages(campaignId, messages) { for (const msg of messages) { try { - await cs.sendRegularMessage(msg.listId, msg.email); + await cs.sendRegularCampaignMessage(msg.listId, msg.email); log.verbose('Senders', 'Message sent and status updated for %s:%s', msg.listId, msg.email); } catch (err) { diff --git a/server/services/verp-server.js b/server/services/verp-server.js index d264795d..c345786b 100644 --- a/server/services/verp-server.js +++ b/server/services/verp-server.js @@ -6,7 +6,7 @@ const config = require('config'); const {MailerError} = require('../lib/mailers'); const campaigns = require('../models/campaigns'); const contextHelpers = require('../lib/context-helpers'); -const {SubscriptionStatus} = require('../../shared/lists'); +const {CampaignMessageStatus} = require('../../shared/campaigns'); const bluebird = require('bluebird'); const BounceHandler = require('bounce-handler').BounceHandler; @@ -56,7 +56,7 @@ function onData(stream, session, callback) { if (!bounceResult || ['failed', 'transient'].indexOf(bounceResult.action) < 0) { return 'Message accepted'; } else { - await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), session.message, SubscriptionStatus.BOUNCED, bounceResult.action === 'failed'); + await campaigns.changeStatusByMessage(contextHelpers.getAdminContext(), session.message, CampaignMessageStatus.BOUNCED, bounceResult.action === 'failed'); log.verbose('VERP', 'Marked message (campaign:%s, list:%s, subscription:%s) as unsubscribed', session.message.campaign, session.message.list, session.message.subscription); } }; diff --git a/shared/campaigns.js b/shared/campaigns.js index ccdbd4b7..7a4390ae 100644 --- a/shared/campaigns.js +++ b/shared/campaigns.js @@ -43,6 +43,20 @@ const CampaignStatus = { MAX: 9 }; + +const CampaignMessageStatus = { + MIN: 0, + + SENT: 1, + UNSUBSCRIBED: 2, + BOUNCED: 3, + COMPLAINED: 4, + SENDING: 5, + + MAX: 5 +}; + + const campaignOverridables = ['from_name', 'from_email', 'reply_to']; function getSendConfigurationPermissionRequiredForSend(campaign, sendConfiguration) { @@ -75,5 +89,6 @@ module.exports = { CampaignType, CampaignStatus, campaignOverridables, + CampaignMessageStatus, getSendConfigurationPermissionRequiredForSend }; \ No newline at end of file