- Refactoring of the mail sending part. Mail queue (table 'queued') is now used also for all test emails.

- More options how to send test emails.
- Fixed problems with pausing a campaign (#593)
- Started rework of transactional sender of templates (#606), however this contains functionality regression at the moment because it does not interpret templates as HBS. It needs HBS option for templates as described in https://github.com/Mailtrain-org/mailtrain/issues/611#issuecomment-502345227

TODO:
- detect sending errors connected to not able to contact the mailer and pause/retry campaing and queued sending - don't mark the recipients as BOUNCED
- add FAILED campaign state and fall into it if sending to campaign consistently fails (i.e. the error with sending is not temporary)
- if the same happends for queued email, delete the message
This commit is contained in:
Tomas Bures 2019-06-25 07:18:06 +02:00
parent ff66a6c39e
commit 30b361290b
42 changed files with 1366 additions and 786 deletions

View file

@ -21,215 +21,163 @@ const htmlToText = require('html-to-text');
const {getPublicUrl} = require('./urls');
const blacklist = require('../models/blacklist');
const libmime = require('libmime');
const shares = require('../models/shares');
const { enforce } = require('./helpers');
const MessageType = {
REGULAR: 0,
TRIGGERED: 1,
TEST: 2
};
class CampaignSender {
constructor() {
}
static async testSend(context, listCid, subscriptionCid, campaignId, sendConfigurationId, html, text) {
let sendConfiguration, list, fieldsGrouped, campaign, subscriptionGrouped, useVerp, useVerpSenderHeader, mergeTags, attachments;
await knex.transaction(async tx => {
sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), sendConfigurationId, false, true);
list = await lists.getByCidTx(tx, context, listCid);
fieldsGrouped = await fields.listGroupedTx(tx, list.id);
useVerp = config.verp.enabled && sendConfiguration.verp_hostname;
useVerpSenderHeader = useVerp && !sendConfiguration.verp_disable_sender_header;
subscriptionGrouped = await subscriptions.getByCid(context, list.id, subscriptionCid);
mergeTags = fields.getMergeTags(fieldsGrouped, subscriptionGrouped);
if (campaignId) {
campaign = await campaigns.getByIdTx(tx, context, campaignId, false, campaigns.Content.WITHOUT_SOURCE_CUSTOM);
await campaigns.enforceSendPermissionTx(tx, context, campaign);
} else {
await shares.enforceEntityPermissionTx(tx, context, 'sendConfiguration', sendConfigurationId, 'sendWithoutOverrides');
// This is to fake the campaign for getMessageLinks, which is called inside formatMessage
campaign = {
cid: '[CAMPAIGN_ID]'
};
}
});
const encryptionKeys = [];
for (const fld of fieldsGrouped) {
if (fld.type === 'gpg' && mergeTags[fld.key]) {
encryptionKeys.push(mergeTags[fld.key].trim());
}
}
attachments = [];
// replace data: images with embedded attachments
html = html.replace(/(<img\b[^>]* src\s*=[\s"']*)(data:[^"'>\s]+)/gi, (match, prefix, dataUri) => {
const cid = shortid.generate() + '-attachments';
attachments.push({
path: dataUri,
cid
});
return prefix + 'cid:' + cid;
});
html = tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, html, true);
text = (text || '').trim()
? tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, text)
: htmlToText.fromString(html, {wordwrap: 130});
const mailer = await mailers.getOrCreateMailer(sendConfiguration.id);
const getOverridable = key => {
return sendConfiguration[key];
};
const campaignAddress = [campaign.cid, list.cid, subscriptionGrouped.cid].join('.');
const mail = {
from: {
name: getOverridable('from_name'),
address: getOverridable('from_email')
},
replyTo: getOverridable('reply_to'),
xMailer: sendConfiguration.x_mailer ? sendConfiguration.x_mailer : false,
to: {
name: list.to_name === null ? undefined : tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, list.to_name, false),
address: subscriptionGrouped.email
},
sender: useVerpSenderHeader ? campaignAddress + '@' + sendConfiguration.verp_hostname : false,
envelope: useVerp ? {
from: campaignAddress + '@' + sendConfiguration.verp_hostname,
to: subscriptionGrouped.email
} : false,
headers: {
'x-fbl': campaignAddress,
// custom header for SparkPost
'x-msys-api': JSON.stringify({
campaign_id: campaignAddress
}),
// custom header for SendGrid
'x-smtpapi': JSON.stringify({
unique_args: {
campaign_id: campaignAddress
}
}),
// custom header for Mailgun
'x-mailgun-variables': JSON.stringify({
campaign_id: campaignAddress
}),
'List-ID': {
prepared: true,
value: libmime.encodeWords(list.name) + ' <' + list.cid + '.' + getPublicUrl() + '>'
}
},
list: {
unsubscribe: null
},
subject: tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, getOverridable('subject'), false),
html,
text,
attachments,
encryptionKeys
};
let response;
try {
const info = await mailer.sendMassMail(mail);
response = info.response || info.messageId;
} catch (err) {
response = err.response || err.message;
}
return response;
}
async init(settings) {
/*
settings is one of:
- campaignCid / campaignId
or
- sendConfiguration, listId, attachments, html, text, subject
*/
async _init(settings) {
this.listsById = new Map(); // listId -> list
this.listsByCid = new Map(); // listCid -> list
this.listsFieldsGrouped = new Map(); // listId -> fieldsGrouped
this.attachments = [];
await knex.transaction(async tx => {
if (settings.campaignCid) {
this.campaign = await campaigns.rawGetByTx(tx, 'cid', settings.campaignCid);
} else {
} else if (settings.campaignId) {
this.campaign = await campaigns.rawGetByTx(tx, 'id', settings.campaignId);
} else {
// We are not within scope of a campaign (i.e. templates in MessageType.TEST message)
// This is to fake the campaign for getMessageLinks, which is called inside formatMessage
this.campaign = {
cid: '[CAMPAIGN_ID]',
from_name_override: null,
from_email_override: null,
reply_to_override: null
};
}
const campaign = this.campaign;
this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), campaign.send_configuration, false, true);
for (const listSpec of campaign.lists) {
const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), listSpec.list);
this.listsById.set(list.id, list);
this.listsByCid.set(list.cid, list);
this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id));
}
if (campaign.source === CampaignSource.TEMPLATE) {
this.template = await templates.getByIdTx(tx, contextHelpers.getAdminContext(), campaign.data.sourceTemplate, false);
}
const attachments = await files.listTx(tx, contextHelpers.getAdminContext(), 'campaign', 'attachment', campaign.id);
for (const attachment of attachments) {
this.attachments.push({
filename: attachment.originalname,
path: files.getFilePath('campaign', 'attachment', campaign.id, attachment.filename)
});
if (settings.sendConfigurationId) {
this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), settings.sendConfigurationId, false, true);
} else if (this.campaign.send_configuration) {
this.sendConfiguration = await sendConfigurations.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.send_configuration, false, true);
} else {
enforce(false);
}
this.useVerp = config.verp.enabled && this.sendConfiguration.verp_hostname;
this.useVerpSenderHeader = this.useVerp && !this.sendConfiguration.verp_disable_sender_header;
if (settings.listId) {
const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), settings.listId);
this.listsById.set(list.id, list);
this.listsByCid.set(list.cid, list);
this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id));
} else if (this.campaign.lists) {
for (const listSpec of this.campaign.lists) {
const list = await lists.getByIdTx(tx, contextHelpers.getAdminContext(), listSpec.list);
this.listsById.set(list.id, list);
this.listsByCid.set(list.cid, list);
this.listsFieldsGrouped.set(list.id, await fields.listGroupedTx(tx, list.id));
}
} else {
enforce(false);
}
if (settings.attachments) {
this.attachments = settings.attachments;
} else if (this.campaign.id) {
const attachments = await files.listTx(tx, contextHelpers.getAdminContext(), 'campaign', 'attachment', this.campaign.id);
this.attachments = [];
for (const attachment of attachments) {
this.attachments.push({
filename: attachment.originalname,
path: files.getFilePath('campaign', 'attachment', this.campaign.id, attachment.filename)
});
}
} else {
this.attachments = [];
}
if (settings.html !== undefined) {
this.html = settings.html;
this.text = settings.text;
} else if (this.campaign.source === CampaignSource.TEMPLATE) {
this.template = await templates.getByIdTx(tx, contextHelpers.getAdminContext(), this.campaign.data.sourceTemplate, false);
}
if (settings.subject !== undefined) {
this.subject = settings.subject;
} else if (this.campaign.subject !== undefined) {
this.subject = this.campaign.subject;
} else {
enforce(false);
}
});
}
async _getMessage(campaign, list, subscriptionGrouped, mergeTags, replaceDataImgs) {
async _getMessage(list, subscriptionGrouped, mergeTags, replaceDataImgs) {
let html = '';
let text = '';
let renderTags = false;
const campaign = this.campaign;
if (campaign.source === CampaignSource.URL) {
const form = tools.getMessageLinks(campaign, list, subscriptionGrouped);
for (const key in mergeTags) {
form[key] = mergeTags[key];
}
const response = await request.post({
uri: campaign.sourceUrl,
form,
resolveWithFullResponse: true
});
if (response.statusCode !== 200) {
throw new Error(`Received status code ${httpResponse.statusCode} from ${campaign.sourceUrl}`);
}
html = response.body;
text = '';
if (this.renderedHtml !== undefined) {
html = this.renderedHtml;
text = this.renderedText;
renderTags = false;
} else if (campaign.source === CampaignSource.CUSTOM || campaign.source === CampaignSource.CUSTOM_FROM_CAMPAIGN || campaign.source === CampaignSource.CUSTOM_FROM_TEMPLATE) {
html = campaign.data.sourceCustom.html;
text = campaign.data.sourceCustom.text;
} else if (this.html !== undefined) {
html = this.html;
text = this.text;
renderTags = true;
} else if (campaign.source === CampaignSource.TEMPLATE) {
const template = this.template;
html = template.html;
text = template.text;
renderTags = true;
} else {
if (campaign.source === CampaignSource.URL) {
const form = tools.getMessageLinks(campaign, list, subscriptionGrouped);
for (const key in mergeTags) {
form[key] = mergeTags[key];
}
const response = await request.post({
uri: campaign.sourceUrl,
form,
resolveWithFullResponse: true
});
if (response.statusCode !== 200) {
throw new Error(`Received status code ${httpResponse.statusCode} from ${campaign.sourceUrl}`);
}
html = response.body;
text = '';
renderTags = false;
} else if (campaign.source === CampaignSource.CUSTOM || campaign.source === CampaignSource.CUSTOM_FROM_CAMPAIGN || campaign.source === CampaignSource.CUSTOM_FROM_TEMPLATE) {
html = campaign.data.sourceCustom.html;
text = campaign.data.sourceCustom.text;
renderTags = true;
} else if (campaign.source === CampaignSource.TEMPLATE) {
const template = this.template;
html = template.html;
text = template.text;
renderTags = true;
}
html = await links.updateLinks(campaign, list, subscriptionGrouped, mergeTags, html);
}
html = await links.updateLinks(campaign, list, subscriptionGrouped, mergeTags, html);
const attachments = this.attachments.slice();
if (replaceDataImgs) {
// replace data: images with embedded attachments
@ -273,6 +221,14 @@ class CampaignSender {
return tags;
}
async initByCampaignCid(campaignCid) {
await this._init({campaignCid});
}
async initByCampaignId(campaignId) {
await this._init({campaignId});
}
async getMessage(listCid, subscriptionCid) {
const list = this.listsByCid.get(listCid);
const subscriptionGrouped = await subscriptions.getByCid(contextHelpers.getAdminContext(), list.id, subscriptionCid);
@ -280,20 +236,36 @@ class CampaignSender {
const campaign = this.campaign;
const mergeTags = fields.getMergeTags(flds, subscriptionGrouped, this._getExtraTags(campaign));
return await this._getMessage(campaign, list, subscriptionGrouped, mergeTags, false);
return await this._getMessage(list, subscriptionGrouped, mergeTags, false);
}
async sendMessageByEmail(listId, email) {
const subscriptionGrouped = await subscriptions.getByEmail(contextHelpers.getAdminContext(), listId, email);
await this._sendMessage(listId, subscriptionGrouped);
}
async sendMessageBySubscriptionId(listId, subscriptionId) {
const subscriptionGrouped = await subscriptions.getById(contextHelpers.getAdminContext(), listId, subscriptionId);
await this._sendMessage(listId, subscriptionGrouped);
}
/*
subData is one of:
- queuedMessage
or
- email, listId
*/
async _sendMessage(subData) {
let msgType;
let subscriptionGrouped;
let listId;
if (subData.queuedMessage) {
const queuedMessage = subData.queuedMessage;
msgType = queuedMessage.type;
listId = queuedMessage.list;
subscriptionGrouped = await subscriptions.getById(contextHelpers.getAdminContext(), listId, queuedMessage.subscription);
} else {
enforce(subData.email);
enforce(subData.listId);
msgType = MessageType.REGULAR;
listId = subData.listId;
subscriptionGrouped = await subscriptions.getByEmail(contextHelpers.getAdminContext(), listId, subData.email);
}
async _sendMessage(listId, subscriptionGrouped) {
const email = subscriptionGrouped.email;
if (await blacklist.isBlacklisted(email)) {
@ -315,7 +287,7 @@ class CampaignSender {
const sendConfiguration = this.sendConfiguration;
const {html, text, attachments} = await this._getMessage(campaign, list, subscriptionGrouped, mergeTags, true);
const {html, text, attachments} = await this._getMessage(list, subscriptionGrouped, mergeTags, true);
const campaignAddress = [campaign.cid, list.cid, subscriptionGrouped.cid].join('.');
@ -380,7 +352,7 @@ class CampaignSender {
list: {
unsubscribe: listUnsubscribe
},
subject: tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, getOverridable('subject'), false),
subject: tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, this.subject, false),
html,
text,
@ -435,17 +407,47 @@ class CampaignSender {
}
await knex('campaigns').where('id', campaign.id).increment('delivered');
if (msgType === MessageType.REGULAR || msgType === MessageType.TRIGGERED) {
await knex('campaigns').where('id', campaign.id).increment('delivered');
}
} catch (err) {
console.log(err);
/*
{ Error: connect ECONNREFUSED 127.0.0.1:55871
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)
cause:
{ Error: connect ECONNREFUSED 127.0.0.1:55871
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)
stack:
'Error: connect ECONNREFUSED 127.0.0.1:55871\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)',
errno: 'ECONNREFUSED',
code: 'ECONNECTION',
syscall: 'connect',
address: '127.0.0.1',
port: 55871,
command: 'CONN' },
isOperational: true,
errno: 'ECONNREFUSED',
code: 'ECONNECTION',
syscall: 'connect',
address: '127.0.0.1',
port: 55871,
command: 'CONN' }
*/
status = SubscriptionStatus.BOUNCED;
response = err.response || err.message;
await knex('campaigns').where('id', campaign.id).increment('delivered').increment('bounced');
if (msgType === MessageType.REGULAR || msgType === MessageType.TRIGGERED) {
await knex('campaigns').where('id', campaign.id).increment('delivered').increment('bounced');
}
}
const now = new Date();
if (campaign.type === CampaignType.REGULAR || campaign.type === CampaignType.RSS_ENTRY) {
if (msgType === MessageType.REGULAR) {
await knex('campaign_messages').insert({
campaign: this.campaign.id,
list: list.id,
@ -457,16 +459,64 @@ class CampaignSender {
updated: now
});
} else if (campaign.type = CampaignType.TRIGGERED) {
} else if (msgType === MessageType.TRIGGERED || msgType === MessageType.TEST) {
if (subData.queuedMessage.data.attachments) {
for (const attachment of subData.queuedMessage.data.attachments) {
try {
// We ignore any errors here because we already sent the message. Thus we have to mark it as completed to avoid sending it again.
await knex.transaction(async tx => {
await files.unlockTx(tx, 'campaign', 'attachment', attachment.id);
});
} catch (err) {
log.error('CampaignSender', `Error when unlocking attachment ${attachment.id} for ${listId}:${subscriptionGrouped.email} (queuedId: ${subData.queuedMessage.id})`);
log.verbose(err.stack);
}
}
}
await knex('queued')
.where({
campaign: this.campaign.id,
list: list.id,
subscription: subscriptionGrouped.id
})
.where({id: subData.queuedMessage.id})
.del();
}
}
async sendRegularMessage(listId, email) {
await this._sendMessage({listId, email});
}
}
module.exports = CampaignSender;
CampaignSender.sendQueuedMessage = async (queuedMessage) => {
const msgData = queuedMessage.data;
const cs = new CampaignSender();
await cs._init({
campaignId: msgData.campaignId,
listId: queuedMessage.list,
sendConfigurationId: queuedMessage.send_configuration,
attachments: msgData.attachments,
html: msgData.html,
text: msgData.text,
subject: msgData.subject
});
await cs._sendMessage({queuedMessage});
};
CampaignSender.queueMessageTx = async (tx, sendConfigurationId, listId, subscriptionId, messageType, messageData) => {
if (messageData.attachments) {
for (const attachment of messageData.attachments) {
await files.lockTx(tx,'campaign', 'attachment', attachment.id);
}
}
await tx('queued').insert({
send_configuration: sendConfigurationId,
list: listId,
subscription: subscriptionId,
type: messageType,
data: JSON.stringify(messageData)
});
};
module.exports.CampaignSender = CampaignSender;
module.exports.MessageType = MessageType;

View file

@ -4,7 +4,6 @@ const knex = require('./knex');
const interoperableErrors = require('../../shared/interoperable-errors');
const entitySettings = require('./entity-settings');
const shares = require('../models/shares');
const { enforce } = require('./helpers');
const defaultNoOfDependenciesReported = 20;
@ -21,7 +20,7 @@ async function ensureNoDependencies(tx, context, id, depSpecs) {
if (depSpec.query) {
rows = await depSpec.query(tx).limit(defaultNoOfDependenciesReported + 1);
} else if (depSpec.column) {
rows = await tx(entityType.entitiesTable).where(depSpec.column, id).select(['id', 'name']).limit(defaultNoOfDependenciesReported + 1);
rows = await tx(entityType.entitiesTable).where(depSpec.column, id).forShare().select(['id', 'name']).limit(defaultNoOfDependenciesReported + 1);
} else if (depSpec.rows) {
rows = await depSpec.rows(tx, defaultNoOfDependenciesReported + 1)
}

View file

@ -44,6 +44,7 @@ const entityTypes = {
},
attachment: {
table: 'files_campaign_attachment',
inUseTable: 'files_campaign_attachment_usage',
permissions: {
view: 'viewAttachments',
manage: 'manageAttachments'

View file

@ -28,11 +28,11 @@ function filterObject(obj, allowedKeys) {
return result;
}
function castToInteger(id) {
function castToInteger(id, msg) {
const val = parseInt(id);
if (!Number.isInteger(val)) {
throw new Error('Invalid id');
throw new Error(msg || 'Invalid id');
}
return val;

View file

@ -94,14 +94,18 @@ async function _sendMail(transport, mail, template) {
return await trySendAsync();
}
async function _sendTransactionalMail(transport, mail, template) {
const sendConfiguration = transport.mailer.sendConfiguration;
async function _sendTransactionalMail(transport, mail) {
if (!mail.headers) {
mail.headers = {};
}
mail.headers['X-Sending-Zone'] = 'transactional';
return await _sendMail(transport, mail);
}
async function _sendTransactionalMailBasedOnTemplate(transport, mail, template) {
const sendConfiguration = transport.mailer.sendConfiguration;
mail.from = {
name: sendConfiguration.from_name,
address: sendConfiguration.from_email
@ -129,7 +133,7 @@ async function _sendTransactionalMail(transport, mail, template) {
});
}
return await _sendMail(transport, mail);
return await _sendTransactionalMail(transport, mail);
}
async function _createTransport(sendConfiguration) {
@ -263,7 +267,8 @@ async function _createTransport(sendConfiguration) {
transport.mailer = {
sendConfiguration,
throttleWait: bluebird.promisify(throttleWait),
sendTransactionalMail: async (mail, template) => await _sendTransactionalMail(transport, mail, template),
sendTransactionalMail: async (mail, template) => await _sendTransactionalMail(transport, mail),
sendTransactionalMailBasedOnTemplate: async (mail, template) => await _sendTransactionalMailBasedOnTemplate(transport, mail, template),
sendMassMail: async (mail, template) => await _sendMail(transport, mail)
};

View file

@ -139,7 +139,7 @@ async function _sendMail(list, email, template, locale, subjectKey, relativeUrls
try {
if (list.send_configuration) {
const mailer = await mailers.getOrCreateMailer(list.send_configuration);
await mailer.sendTransactionalMail({
await mailer.sendTransactionalMailBasedOnTemplate({
to: {
name: getDisplayName(flds, subscription),
address: email

View file

@ -1,87 +0,0 @@
'use strict';
const mailers = require('./mailers');
const tools = require('./tools');
const templates = require('../models/templates');
const { getMergeTagsForBases } = require('../../shared/templates');
const { getTrustedUrl, getSandboxUrl, getPublicUrl } = require('../lib/urls');
class TemplateSender {
constructor(options) {
this.defaultOptions = {
maxMails: 100,
...options
};
}
async send(params) {
const options = { ...this.defaultOptions, ...params };
this._validateMailOptions(options);
const [mailer, template] = await Promise.all([
mailers.getOrCreateMailer(
options.sendConfigurationId
),
templates.getById(
options.context,
options.templateId,
false
)
]);
const variables = {
EMAIL: options.email,
...getMergeTagsForBases(getTrustedUrl(), getSandboxUrl(), getPublicUrl()),
...options.variables
};
const html = tools.formatTemplate(
template.html,
null,
variables,
true
);
const subject = tools.formatTemplate(
options.subject || template.description || template.name,
variables
);
return mailer.sendTransactionalMail(
{
to: options.email,
subject
},
{
html: { template: html },
data: options.data,
locale: options.locale
}
);
}
_validateMailOptions(options) {
let { context, email, locale, templateId } = options;
if (!templateId) {
throw new Error('Missing templateId');
}
if (!context) {
throw new Error('Missing context');
}
if (!email || email.length === 0) {
throw new Error('Missing email');
}
if (typeof email === 'string') {
email = email.split(',');
}
if (email.length > options.maxMails) {
throw new Error(
`Cannot send more than ${options.maxMails} emails at once`
);
}
if (!locale) {
throw new Error('Missing locale');
}
}
}
module.exports = TemplateSender;