Added router for links. Not tested.

This commit is contained in:
Tomas Bures 2018-09-22 16:21:19 +02:00
parent 92d28daa9e
commit a9e1700dbe
9 changed files with 56 additions and 1399 deletions

View file

@ -26,6 +26,7 @@ const reports = require('./routes/reports');
const subscription = require('./routes/subscription');
const mosaico = require('./routes/mosaico');
const files = require('./routes/files');
const links = require('./routes/links');
const namespacesRest = require('./routes/rest/namespaces');
const sendConfigurationsRest = require('./routes/rest/send-configurations');
@ -215,6 +216,7 @@ function createApp(appType) {
if (appType === AppType.PUBLIC) {
useWith404Fallback('/subscription', subscription);
useWith404Fallback('/links', links);
}
if (appType === AppType.TRUSTED || appType === AppType.SANDBOXED) {

View file

@ -1,5 +1,6 @@
'use strict';
const log = require('npmlog');
const knex = require('../lib/knex');
const dtHelpers = require('../lib/dt-helpers');
const shares = require('./shares');
@ -13,6 +14,7 @@ const uaParser = require('device');
const he = require('he');
const { enforce } = require('../lib/helpers');
const { getTrustedUrl } = require('../lib/urls');
const tools = require('../lib/tools');
const LinkId = {
OPEN: -1,
@ -124,7 +126,7 @@ async function addOrGet(campaignId, url) {
});
}
async function updateLinks(campaign, list, subscription, message) {
async function updateLinks(campaign, list, subscription, mergeTags, message) {
if ((campaign.open_tracking_disabled && campaign.click_tracking_disabled) || !message || !message.trim()) {
// tracking is disabled, do not modify the message
return message;
@ -156,7 +158,9 @@ async function updateLinks(campaign, list, subscription, message) {
const urls = new Map(); // url -> {id, cid} (as returned by add)
for (const url of urlsToBeReplaced) {
const link = await addOrGet(campaign.id, url);
// url might include variables, need to rewrite those just as we do with message content
const expanedUrl = tools.formatMessage(campaign, list, subscription, mergeTags, url);
const link = await addOrGet(campaign.id, expanedUrl);
urls.set(url, link);
}

View file

@ -1,223 +0,0 @@
'use strict';
// FIXME - revisit and rewrite if necessary
let log = require('npmlog');
let db = require('../lib/db');
let tools = require('../lib/tools');
let feed = require('../lib/feed');
let campaigns = require('../lib/models/campaigns');
let _ = require('../lib/translate')._;
let util = require('util');
const feed_timeout = 15 * 1000;
const rss_timeout = 1 * 1000;
const feedparser = require('feedparser-promised');
async function fetch(url) {
const httpOptions = {
uri: 'http://feeds.feedwrench.com/JavaScriptJabber.rss',
headers: {
'user-agent': 'Mailtrain',
'accept': 'text/html,application/xhtml+xml'
}
};
const items = await feedparser.parse(httpOptions);
const entries = [];
for (const item of items) {
const entry = {
title: item.title,
date: item.date || item.pubdate || item.pubDate || new Date(),
guid: item.guid || item.link,
link: item.link,
content: item.description || item.summary,
summary: item.summary || item.description,
image_url: item.image.url
};
entries.push(entry);
}
return entries;
}
function feedLoop() {
db.getConnection((err, connection) => {
if (err) {
log.error('Feed', err.stack);
return setTimeout(feedLoop, feed_timeout);
}
let query = 'SELECT `id`, `source_url`, `from`, `address`, `subject`, `list`, `segment`, `html`, `open_tracking_disabled`, `click_tracking_disabled` FROM `campaigns` WHERE `type`=2 AND `status`=6 AND (`last_check` IS NULL OR `last_check`< NOW() - INTERVAL 10 MINUTE) LIMIT 1';
connection.query(query, (err, rows) => {
connection.release();
if (err) {
log.error('Feed', err);
return setTimeout(feedLoop, feed_timeout);
}
if (!rows || !rows.length) {
return setTimeout(feedLoop, feed_timeout);
}
let parent = tools.convertKeys(rows[0]);
updateRssInfo(parent.id, true, false, () => {
log.verbose('Feed', 'Checking feed %s (%s)', parent.sourceUrl, parent.id);
feed.fetch(parent.sourceUrl, (err, entries) => {
if (err) {
log.error('Feed', err);
return updateRssInfo(parent.id, false, 'Feed error: ' + err.message, () => {
setTimeout(feedLoop, rss_timeout);
});
}
checkEntries(parent, entries, (err, result) => {
let message;
if (err) {
log.error('Feed', err);
message = util.format(_('Feed error: %s'), err.message);
} else if (result) {
log.verbose('Feed', 'Added %s new campaigns for %s', result, parent.id);
message = util.format(_('Found %s new campaign messages from feed'), result);
} else {
message = _('Found nothing new from the feed');
}
return updateRssInfo(parent.id, false, message, () => {
setTimeout(feedLoop, rss_timeout);
});
});
});
});
});
});
}
function updateRssInfo(id, updateCheck, status, callback) {
db.getConnection((err, connection) => {
if (err) {
log.error('Feed', err.stack);
return callback(err);
}
let query;
let values;
if (updateCheck) {
if (status) {
query = 'UPDATE `campaigns` SET `last_check`=NOW(), `check_status`=? WHERE id=? LIMIT 1';
values = [status, id];
} else {
query = 'UPDATE `campaigns` SET `last_check`=NOW() WHERE id=? LIMIT 1';
values = [id];
}
} else {
query = 'UPDATE `campaigns` SET `check_status`=? WHERE id=? LIMIT 1';
values = [status, id];
}
connection.query(query, values, (err, result) => {
connection.release();
if (err) {
log.error('Feed', err);
return callback(err);
}
return callback(null, result.affectedRows);
});
});
}
function checkEntries(parent, entries, callback) {
let pos = 0;
let added = 0;
let checkNextEntry = () => {
if (pos >= entries.length) {
return callback(null, added);
}
let entry = entries[pos++];
if (!entry || !entry.guid) {
return checkNextEntry();
}
db.getConnection((err, connection) => {
if (err) {
log.error('Feed', err.stack);
return setTimeout(checkNextEntry, 15 * 1000);
}
// parent+guid is unique, so the query should fail for existing entries
let query = 'INSERT IGNORE INTO `rss` (`parent`, `guid`, `pubdate`) VALUES (?,?,?)';
connection.query(query, [parent.id, entry.guid, entry.date], (err, result) => {
connection.release();
if (err) {
log.error('Feed', err);
return setTimeout(checkNextEntry, 15 * 1000);
}
if (!result.insertId) {
return setImmediate(checkNextEntry);
}
let entryId = result.insertId;
let html = (parent.html || '').toString().trim();
if (/\[RSS_ENTRY[\w]*\]/i.test(html)) {
html = html.replace(/\[RSS_ENTRY\]/, entry.content); //for backward compatibility
Object.keys(entry).forEach(key => {
html = html.replace(new RegExp('\\[RSS_ENTRY_' + key.toUpperCase() + '\\]', 'g'), entry[key]);
});
} else {
html = entry.content + html;
}
let campaign = {
type: 'entry',
name: entry.title || util.format(_('RSS entry %s'), entry.guid.substr(0, 67)),
from: parent.from,
address: parent.address,
subject: entry.title || parent.subject,
list: parent.segment ? parent.list + ':' + parent.segment : parent.list,
html,
openTrackingDisabled: parent.openTrackingDisabled,
clickTrackingDisabled: parent.clickTrackingDisabled
};
campaigns.create(campaign, {
parent: parent.id
}, (err, campaignId) => {
if (err) {
log.error('Campaign', err);
return setTimeout(checkNextEntry, 15 * 1000);
}
added++;
db.getConnection((err, connection) => {
if (err) {
log.error('Feed', err.stack);
return setTimeout(checkNextEntry, 15 * 1000);
}
let query = 'UPDATE `rss` SET `campaign`=? WHERE id=? LIMIT 1';
connection.query(query, [campaignId, entryId], err => {
connection.release();
if (err) {
log.error('Feed', err.stack);
return setTimeout(checkNextEntry, 15 * 1000);
}
return setImmediate(checkNextEntry);
});
});
});
});
});
};
checkNextEntry();
}
module.exports = callback => {
feedLoop();
setImmediate(callback);
};

View file

@ -1,280 +0,0 @@
'use strict';
// FIXME - revisit and rewrite if necessary
let log = require('npmlog');
let db = require('../lib/db');
let tools = require('../lib/tools');
let _ = require('../lib/translate')._;
let fields = require('../lib/models/fields');
let subscriptions = require('../lib/models/subscriptions');
let fs = require('fs');
let csvparse = require('csv-parse');
const process_timout = 5 * 1000;
function findUnprocessed(callback) {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1';
connection.query(query, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let importer = rows[0];
let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1';
connection.query(query, [importer.id], (err, result) => {
connection.release();
if (err) {
return callback(err);
}
if (!result.affectedRows) {
// check next one
return findUnprocessed(callback);
}
let importer = tools.convertKeys(rows[0]);
try {
importer.mapping = JSON.parse(importer.mapping);
} catch (E) {
importer.mapping = {
columns: [],
mapping: {}
};
}
return callback(null, importer);
});
});
});
}
function processImport(data, callback) {
let parser = csvparse({
comment: '#',
delimiter: data.delimiter
});
let listId = data.list;
fields.list(data.list, (err, fieldList) => {
if (err && !fieldList) {
fieldList = [];
}
let firstRow;
let finished = false;
let inputStream = fs.createReadStream(data.path);
let fieldTypes = {};
fieldList.forEach(field => {
if (field.column) {
fieldTypes[field.column] = field.type;
}
if (field.options) {
field.options.forEach(subField => {
if (subField.column) {
fieldTypes[subField.column] = subField.type;
}
});
}
});
inputStream.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
parser.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
let processing = false;
let processRows = () => {
let record = parser.read();
if (record === null) {
processing = false;
return;
}
processing = true;
if (!firstRow) {
firstRow = record;
return setImmediate(processRows);
}
let entry = {};
Object.keys(data.mapping.mapping || {}).forEach(key => {
// TODO: process all data types
if (fieldTypes[key] === 'option') {
entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0;
} else if (fieldTypes[key] === 'number') {
entry[key] = Number(record[data.mapping.mapping[key]]) || 0;
} else {
entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null;
}
});
if (!entry.email) {
log.verbose('Import', 'Failed processing row, email missing');
return setImmediate(processRows);
}
function insertToSubscription() {
subscriptions.insert(listId, {
imported: data.id,
status: data.type,
partial: true
}, entry, (err, response) => {
if (err) {
// ignore
log.error('Import', err.stack);
} else if (response.entryId) {
//log.verbose('Import', 'Inserted %s as %s', entry.email, entryId);
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query;
if (response.inserted) {
// this record did not exist before, count as new
query = 'UPDATE importer SET `processed`=`processed`+1, `new`=`new`+1 WHERE `id`=? LIMIT 1';
} else {
// it's an existing record
query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1';
}
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
}
if (data.emailcheck === 1) {
tools.validateEmail(entry.email, true, err => {
if (err) {
let reason = (err.message || '').toString().trim().replace(/^[a-z]Error:\s*/i, '');
log.verbose('Import', 'Failed processing row %s: %s', entry.email, reason);
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query = 'INSERT INTO import_failed (`import`, `email`, `reason`) VALUES(?,?,?)';
connection.query(query, [data.id, entry.email, reason], err => {
if (err) {
connection.release();
return setImmediate(processRows);
}
let query = 'UPDATE importer SET `failed`=`failed`+1 WHERE `id`=? LIMIT 1';
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
return;
}
insertToSubscription();
});
} else {
insertToSubscription();
}
};
parser.on('readable', () => {
if (finished || processing) {
return;
}
processRows();
});
parser.on('finish', () => {
if (finished) {
return;
}
finished = true;
callback(null, true);
});
inputStream.pipe(parser);
});
}
let importLoop = () => {
let getNext = () => {
// find an unsent message
findUnprocessed((err, data) => {
if (err) {
log.error('Import', err.stack);
setTimeout(getNext, process_timout);
return;
}
if (!data) {
setTimeout(getNext, process_timout);
return;
}
processImport(data, err => {
let failed = null;
if (err) {
if (err.code === 'ENOENT') {
failed = _('Could not access import file');
} else {
failed = err.message || err;
}
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setTimeout(getNext, process_timout);
}
let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1';
connection.query(query, [!failed ? 3 : 4, failed, data.id], () => {
connection.release();
getNext();
});
});
});
});
};
getNext();
};
module.exports = callback => {
importLoop();
setImmediate(callback);
};

View file

@ -1,611 +0,0 @@
'use strict';
// FIXME - update/rewrite
const { nodeifyFunction } = require('../lib/nodeify');
const getSettings = nodeifyFunction(require('../models/settings').get);
let log = require('npmlog');
let config = require('config');
let db = require('../lib/db');
let tools = require('../lib/tools');
let mailer = require('../lib/mailers');
let campaigns = require('../lib/models/campaigns');
let segments = require('../lib/models/segments');
let lists = require('../lib/models/lists');
let blacklist = require('../lib/models/blacklist');
let fields = require('../lib/models/fields');
let links = require('../lib/models/links');
let shortid = require('shortid');
let url = require('url');
let htmlToText = require('html-to-text');
let request = require('request');
let libmime = require('libmime');
let _ = require('../lib/translate')._;
let util = require('util');
let attachmentCache = new Map();
let attachmentCacheSize = 0;
const mailing_timeout = 5 * 1000;
function findUnsent(callback) {
let returnUnsent = (row, campaign) => {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let subscription = tools.convertKeys(row);
let query = 'INSERT INTO `campaign__' + campaign.id + '` (list, segment, subscription) VALUES(?, ?,?)';
connection.query(query, [campaign.list, campaign.segment, subscription.id], (err, result) => {
connection.release();
if (err) {
if (err.code === 'ER_DUP_ENTRY') {
// race condition, try next one
return findUnsent(callback);
}
return callback(err);
}
subscription.campaign = campaign.id;
callback(null, {
id: result.insertId,
listId: campaign.list,
campaignId: campaign.id,
subscription
});
});
});
};
// get next subscriber from trigger queue
let checkQueued = () => {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
connection.query('SELECT * FROM `queued` ORDER BY `created` ASC LIMIT 1', (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let queued = tools.convertKeys(rows[0]);
// delete queued element
connection.query('DELETE FROM `queued` WHERE `campaign`=? AND `list`=? AND `subscriber`=? LIMIT 1', [queued.campaign, queued.list, queued.subscriber], err => {
if (err) {
connection.release();
return callback(err);
}
// get campaign
connection.query('SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `id`=? LIMIT 1', [queued.campaign], (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let campaign = tools.convertKeys(rows[0]);
// get subscription
connection.query('SELECT * FROM `subscription__' + queued.list + '` WHERE `id`=? AND `status`=1 LIMIT 1', [queued.subscriber], (err, rows) => {
connection.release();
if (err) {
return callback(err);
}
if (!rows || !rows.length) {
return callback(null, false);
}
return returnUnsent(rows[0], campaign);
});
});
});
});
});
};
db.getFromCache('sender', (err, cached) => {
if (err) {
return callback(err);
}
if (cached) {
return returnUnsent(cached.row, cached.campaign);
}
db.getLock('queue', (err, lock) => {
if (err) {
return callback(err);
}
if (!lock) {
return setTimeout(() => findUnsent(callback), 10 * 1000);
}
// try again to fetch a key from cache, maybe there was some other instance that held the lock
db.getFromCache('sender', (err, cached) => {
if (err) {
return callback(err);
}
if (cached) {
return lock.release(() => {
returnUnsent(cached.row, cached.campaign);
});
}
let done = function () {
lock.release(() => {
callback(...arguments);
});
};
db.getConnection((err, connection) => {
if (err) {
return done(err);
}
// Find "normal" campaigns. Ignore RSS and drip campaigns at this point
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1';
connection.query(query, [2, 1, 3], (err, rows) => {
connection.release();
if (err) {
return done(err);
}
if (!rows || !rows.length) {
return checkQueued();
}
let campaign = tools.convertKeys(rows[0]);
let getSegmentQuery = (segmentId, next) => {
segmentId = Number(segmentId);
if (!segmentId) {
return next(null, {
where: '',
values: []
});
}
segments.getQuery(segmentId, 'subscription', next);
};
getSegmentQuery(campaign.segment, (err, queryData) => {
if (err) {
return done(err);
}
db.getConnection((err, connection) => {
if (err) {
return done(err);
}
// TODO: Add support for localized sending time. In this case campaign messages are
// not sent before receiver's local time reaches defined time
// SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime
let query;
let values;
// NOT IN
query = 'SELECT * FROM `subscription__' + campaign.list + '` AS subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 1000';
values = queryData.values.concat([campaign.list, campaign.segment]);
// LEFT JOIN / IS NULL
//query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 150';
//values = [campaign.list, campaign.segment].concat(queryData.values);
connection.query(query, values, (err, rows) => {
if (err) {
connection.release();
return done(err);
}
if (!rows || !rows.length) {
// everything already processed for this campaign
connection.query('UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? AND `status`=? LIMIT 1', [campaign.id, 2], () => {
connection.release();
return done(null, false);
});
return;
}
connection.release();
let pos = 0;
let addToCache = () => {
if (pos >= rows.length) {
lock.release(() => {
findUnsent(callback);
});
return;
}
let row = rows[pos++];
db.addToCache('sender', {
row,
campaign
}, err => {
if (err) {
return done(err);
}
setImmediate(addToCache);
});
};
addToCache();
});
});
});
});
});
});
});
});
}
function getAttachments(campaign, callback) {
campaigns.getAttachments(campaign.id, (err, attachments) => {
if (err) {
return callback(err);
}
if (!attachments) {
return callback(null, []);
}
let response = [];
let pos = 0;
let getNextAttachment = () => {
if (pos >= attachments.length) {
return callback(null, response);
}
let attachment = attachments[pos++];
let aid = campaign.id + ':' + attachment.id;
if (attachmentCache.has(aid)) {
response.push(attachmentCache.get(aid));
return setImmediate(getNextAttachment);
}
campaigns.getAttachment(campaign.id, attachment.id, (err, attachment) => {
if (err) {
return callback(err);
}
if (!attachment || !attachment.content) {
return setImmediate(getNextAttachment);
}
response.push(attachment);
// make sure we do not cache more buffers than 30MB
if (attachmentCacheSize + attachment.content.length > 30 * 1024 * 1024) {
attachmentCacheSize = 0;
attachmentCache.clear();
}
attachmentCache.set(aid, attachment);
attachmentCacheSize += attachment.content.length;
return setImmediate(getNextAttachment);
});
};
getNextAttachment();
});
}
function formatMessage(message, callback) {
campaigns.get(message.campaignId, false, (err, campaign) => {
if (err) {
return callback(err);
}
if (!campaign) {
return callback(new Error(_('Campaign not found')));
}
lists.get(message.listId, (err, list) => {
if (err) {
return callback(err);
}
if (!list) {
return callback(new Error(_('List not found')));
}
settings.list(['serviceUrl', 'verpUse', 'verpHostname', 'xMailer'], (err, configItems) => {
if (err) {
return callback(err);
}
let useVerp = config.verp.enabled && configItems.verpUse && configItems.verpHostname;
let useVerpSenderHeader = useVerp && config.verp.disablesenderheader !== true;
fields.list(list.id, (err, fieldList) => {
if (err) {
return callback(err);
}
message.subscription.mergeTags = {
EMAIL: message.subscription.email,
FIRST_NAME: message.subscription.firstName,
LAST_NAME: message.subscription.lastName,
FULL_NAME: [].concat(message.subscription.firstName || []).concat(message.subscription.lastName || []).join(' ')
};
let encryptionKeys = [];
fields.getRow(fieldList, message.subscription, false, true).forEach(field => {
if (field.mergeTag) {
message.subscription.mergeTags[field.mergeTag] = field.mergeValue || '';
}
if (field.type === 'gpg' && field.value) {
encryptionKeys.push(field.value.trim());
}
if (field.options) {
field.options.forEach(subField => {
if (subField.mergeTag) {
message.subscription.mergeTags[subField.mergeTag] = subField.value && subField.mergeValue || '';
}
});
}
});
let renderAndSend = (html, text, renderTags) => {
links.updateLinks(campaign, list, message.subscription, configItems.serviceUrl, html, (err, html) => {
if (err) {
return callback(err);
}
// replace data: images with embedded attachments
getAttachments(campaign, (err, attachments) => {
if (err) {
return callback(err);
}
html = html.replace(/(<img\b[^>]* src\s*=[\s"']*)(data:[^"'>\s]+)/gi, (match, prefix, dataUri) => {
let cid = shortid.generate() + '-attachments@' + campaign.address.split('@').pop();
attachments.push({
path: dataUri,
cid
});
return prefix + 'cid:' + cid;
});
let campaignAddress = [campaign.cid, list.cid, message.subscription.cid].join('.');
let renderedHtml = renderTags ? tools.formatMessage(configItems.serviceUrl, campaign, list, message.subscription, html, false, true) : html;
let renderedText = (text || '').trim() ? (renderTags ? tools.formatMessage(configItems.serviceUrl, campaign, list, message.subscription, text) : text) : htmlToText.fromString(renderedHtml, {
wordwrap: 130
});
let listUnsubscribe = null;
if (!list.listunsubscribeDisabled) {
listUnsubscribe = campaign.unsubscribe ? tools.formatMessage(configItems.serviceUrl, campaign, list, message.subscription, campaign.unsubscribe) : url.resolve(configItems.serviceUrl, '/subscription/' + list.cid + '/unsubscribe/' + message.subscription.cid);
}
return callback(null, {
from: {
name: campaign.from,
address: campaign.address
},
replyTo: campaign.replyTo,
xMailer: configItems.xMailer ? configItems.xMailer : false,
to: {
name: [].concat(message.subscription.firstName || []).concat(message.subscription.lastName || []).join(' '),
address: message.subscription.email
},
sender: useVerpSenderHeader ? campaignAddress + '@' + configItems.verpHostname : false,
envelope: useVerp ? {
from: campaignAddress + '@' + configItems.verpHostname,
to: message.subscription.email
} : false,
headers: {
'x-fbl': campaignAddress,
// custom header for SparkPost
'x-msys-api': JSON.stringify({
campaign_id: campaignAddress
}),
// custom header for SendGrid
'x-smtpapi': JSON.stringify({
unique_args: {
campaign_id: campaignAddress
}
}),
// custom header for Mailgun
'x-mailgun-variables': JSON.stringify({
campaign_id: campaignAddress
}),
'List-ID': {
prepared: true,
value: libmime.encodeWords(list.name) + ' <' + list.cid + '.' + (url.parse(configItems.serviceUrl).hostname || 'localhost') + '>'
}
},
list: {
unsubscribe: listUnsubscribe
},
subject: tools.formatMessage(configItems.serviceUrl, campaign, list, message.subscription, campaign.subject),
html: renderedHtml,
text: renderedText,
attachments,
encryptionKeys
});
});
});
};
if (campaign.sourceUrl) {
let form = tools.getMessageLinks(configItems.serviceUrl, campaign, list, message.subscription);
Object.keys(message.subscription.mergeTags).forEach(key => {
form[key] = message.subscription.mergeTags[key];
});
request.post({
url: campaign.sourceUrl,
form
}, (err, httpResponse, body) => {
if (err) {
return callback(err);
}
if (httpResponse.statusCode !== 200) {
return callback(new Error(util.format(_('Received status code %s from %s'), httpResponse.statusCode, campaign.sourceUrl)));
}
renderAndSend(body && body.toString(), '', false);
});
} else {
renderAndSend(campaign.htmlPrepared || campaign.html, campaign.text, true);
}
});
});
});
});
}
let sendLoop = () => {
mailers.getMailer(err => {
if (err) {
log.error('Mail', err.stack);
return setTimeout(sendLoop, 10 * 1000);
}
let isThrottled = false;
let getNext = () => {
if (!mailers.transport.isIdle() || isThrottled) {
// only retrieve new messages if there are free slots in the mailers queue
return;
}
isThrottled = true;
mailers.transport.checkThrottling(() => {
isThrottled = false;
// find an unsent message
findUnsent((err, message) => {
if (err) {
log.error('Mail', err.stack);
setTimeout(getNext, mailing_timeout);
return;
}
if (!message) {
setTimeout(getNext, mailing_timeout);
return;
}
// log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid);
// format message to nodemailers message format
formatMessage(message, (err, mail) => {
if (err) {
log.error('Mail', err.stack);
setTimeout(getNext, mailing_timeout);
return;
}
blacklist.isblacklisted(mail.to.address, (err, blacklisted) => {
if (err) {
log.error('Mail', err);
setTimeout(getNext, mailing_timeout);
return;
}
if (!blacklisted) {
let tryCount = 0;
let trySend = () => {
tryCount++;
// send the message
mailers.transport.sendMail(mail, (err, info) => {
if (err) {
log.error('Mail', err.stack);
if (err.responseCode && err.responseCode >= 400 && err.responseCode < 500 && tryCount <= 5) {
// temporary error, try again
return setTimeout(trySend, tryCount * 1000);
}
}
let status = err ? 3 : 1;
let response = err && (err.response || err.message) || info.response || info.messageId;
let responseId = response.split(/\s+/).pop();
db.getConnection((err, connection) => {
if (err) {
log.error('Mail', err.stack);
return;
}
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 3 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => {
if (err) {
log.error('Mail', err.stack);
}
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [status, response, responseId, message.id], err => {
connection.release();
if (err) {
log.error('Mail', err.stack);
} else {
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
}
});
});
});
});
};
setImmediate(trySend);
} else {
db.getConnection((err, connection) => {
if (err) {
log.error('Mail', err);
return;
}
let query = 'UPDATE `campaigns` SET `blacklisted`=`blacklisted`+1 WHERE id=? LIMIT 1';
connection.query(query, [message.campaignId], err => {
if (err) {
log.error('Mail', err);
}
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
connection.query(query, [5, 'blacklisted', 'blacklisted', message.id], err => {
connection.release();
if (err) {
log.error('Mail', err);
}
});
});
});
}
setImmediate(getNext);
});
});
});
});
};
mailers.transport.on('idle', getNext);
setImmediate(getNext);
});
};
sendLoop();
process.on('message', m => {
if (m && m.reload) {
log.info('Sender/' + process.pid, 'Reloading mailers config');
mailers.update();
}
});

View file

@ -1,121 +0,0 @@
'use strict';
// FIXME - revisit and rewrite if necessary
let log = require('npmlog');
let db = require('../lib/db');
let tools = require('../lib/tools');
let triggers = require('../lib/models/triggers');
let _ = require('../lib/translate')._;
let util = require('util');
function triggerLoop() {
checkTrigger((err, triggerId) => {
if (err) {
log.error('Triggers', err);
}
if (triggerId) {
return setImmediate(triggerLoop);
} else {
return setTimeout(triggerLoop, 15 * 1000);
}
});
}
function checkTrigger(callback) {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let query = 'SELECT * FROM `triggers` WHERE `enabled`=1 AND `last_check`<=NOW()-INTERVAL 1 MINUTE ORDER BY `last_check` ASC LIMIT 1';
connection.query(query, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let trigger = tools.convertKeys(rows[0]);
let query = 'UPDATE `triggers` SET `last_check`=NOW() WHERE id=? LIMIT 1';
connection.query(query, [trigger.id], err => {
connection.release();
if (err) {
return callback(err);
}
triggers.getQuery(trigger.id, (err, query) => {
if (err) {
return callback(err);
}
if (!query) {
return callback(new Error(util.format(_('Unknown trigger type %s'), trigger.id)));
}
trigger.query = query;
fireTrigger(trigger, callback);
});
});
});
});
}
function fireTrigger(trigger, callback) {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
connection.query(trigger.query, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, trigger.id);
}
let pos = 0;
let insertNext = () => {
if (pos >= rows.length) {
connection.release();
return callback(null, trigger.id);
}
let subscriber = rows[pos++].id;
let query = 'INSERT INTO `trigger__' + trigger.id + '` (`list`, `segment`, `subscription`) VALUES (?,?,?)';
let values = [trigger.list, trigger.segment, subscriber];
connection.query(query, values, (err, result) => {
if (err && err.code !== 'ER_DUP_ENTRY') {
connection.release();
return callback(err);
}
if (!result.affectedRows) {
return setImmediate(insertNext);
}
log.verbose('Triggers', 'Triggered %s (%s) for subscriber %s', trigger.name, trigger.id, subscriber);
let query = 'INSERT INTO `queued` (`campaign`, `list`, `subscriber`, `source`) VALUES (?,?,?,?)';
let values = [trigger.destCampaign, trigger.list, subscriber, 'trigger ' + trigger.id];
connection.query(query, values, err => {
if (err && err.code !== 'ER_DUP_ENTRY') {
connection.release();
return callback(err);
}
// update counter
let query = 'UPDATE `triggers` SET `count`=`count`+1 WHERE id=?';
let values = [trigger.id];
connection.query(query, values, () => setImmediate(insertNext));
});
});
};
insertNext();
});
});
}
module.exports = callback => {
triggerLoop();
setImmediate(callback);
};

View file

@ -1,160 +0,0 @@
'use strict';
// FIXME - port for the new campaigns model
const { nodeifyFunction } = require('../lib/nodeify');
const log = require('npmlog');
const config = require('config');
const verpHelpers = require('../lib/verp-helpers');
const campaigns = require('../models/campaigns');
const BounceHandler = require('bounce-handler').BounceHandler;
const SMTPServer = require('smtp-server').SMTPServer;
async function onRcptTo(address, session) {
const user = address.address.split('@').shift();
const host = address.address.split('@').pop();
if (host !== configItems.verpHostname || !/^[a-z0-9_-]+\.[a-z0-9_-]+\.[a-z0-9_-]+$/i.test(user)) {
err = new Error('Unknown user ' + address.address);
err.responseCode = 510;
return callback(err);
}
campaigns.findMailByCampaign(user, (err, message) => {
if (err) {
err = new Error('Failed to load user data');
err.responseCode = 421;
return callback(err);
}
if (!message) {
err = new Error('Unknown user ' + address.address);
err.responseCode = 510;
return callback(err);
}
session.campaignId = user;
session.message = message;
log.verbose('VERP', 'Incoming message for Campaign %s, List %s, Subscription %s', message.campaign, message.list, message.subscription);
callback();
});
});
}
async function onData(stream, session) {
let chunks = [];
let chunklen = 0;
stream.on('data', chunk => {
if (!chunk || !chunk.length || chunklen > 60 * 1024) {
return;
}
chunks.push(chunk);
chunklen += chunk.length;
});
stream.on('end', () => {
let body = Buffer.concat(chunks, chunklen).toString();
let bh = new BounceHandler();
let bounceResult;
try {
bounceResult = [].concat(bh.parse_email(body) || []).shift();
} catch (E) {
log.error('Bounce', 'Failed parsing bounce message');
log.error('Bounce', JSON.stringify(body));
}
if (!bounceResult || ['failed', 'transient'].indexOf(bounceResult.action) < 0) {
return callback(null, 'Message accepted');
} else {
campaigns.updateMessage(session.message, 'bounced', bounceResult.action === 'failed', (err, updated) => {
if (err) {
log.error('VERP', 'Failed updating message: %s', err);
} else if (updated) {
log.verbose('VERP', 'Marked message %s as unsubscribed', session.campaignId);
}
callback(null, 'Message accepted');
});
}
});
}
// Setup server
const server = new SMTPServer({
// log to console
logger: false,
banner: 'Mailtrain VERP bouncer',
disabledCommands: ['AUTH', 'STARTTLS'],
onRcptTo: nodeifyFunction(onRcptTo),
onData: nodeifyFunction(onData)
});
module.exports = callback => {
if (!config.verp.enabled) {
return setImmediate(callback);
}
let started = false;
server.on('error', err => {
const port = config.verp.port;
const bind = typeof port === 'string' ? 'Pipe ' + port : 'Port ' + port;
switch (err.code) {
case 'EACCES':
log.error('VERP', '%s requires elevated privileges', bind);
break;
case 'EADDRINUSE':
log.error('VERP', '%s is already in use', bind);
break;
case 'ECONNRESET': // Usually happens when a client does not disconnect cleanly
case 'EPIPE': // Remote connection was closed before the server attempted to send data
default:
log.error('VERP', err);
}
if (!started) {
started = true;
return callback(err);
}
});
let hosts;
if (typeof config.verp.host === 'string' && config.verp.host) {
hosts = config.verp.host.trim().split(',').map(host => host.trim()).filter(host => !!host);
if (hosts.indexOf('*') >= 0 || hosts.indexOf('all') >= 0) {
hosts = [false];
}
} else {
hosts = [false];
}
let pos = 0;
const startNextHost = () => {
if (pos >= hosts.length) {
started = true;
return setImmediate(callback);
}
let host = hosts[pos++];
server.listen(config.verp.port, host, () => {
if (started) {
return server.close();
}
log.info('VERP', 'Server listening on %s:%s', host || '*', config.verp.port);
setImmediate(startNextHost);
});
};
startNextHost();
};

47
routes/links.js Normal file
View file

@ -0,0 +1,47 @@
'use strict';
const log = require('npmlog');
const config = require('config');
const router = require('../lib/router-async').create();
const links = require('../models/links');
const trackImg = new Buffer('R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7', 'base64');
router.getAsync('/:campaign/:list/:subscription', async (req, res) => {
res.writeHead(200, {
'Content-Type': 'image/gif',
'Content-Length': trackImg.length
});
await links.countLink(req.ip, req.headers['user-agent'], req.params.campaign, req.params.list, req.params.subscription, links.LinkId.OPEN);
res.end(trackImg);
});
router.getAsync('/:campaign/:list/:subscription/:link', async (req, res) => {
const notFound = () => {
res.status(404);
return res.render('archive/view', {
layout: 'archive/layout',
message: _('Oops, we couldn\'t find a link for the URL you clicked'),
campaign: {
subject: 'Error 404'
}
});
};
const link = await links.resolve(req.params.link);
if (link) {
await links.countLink(req.ip, req.headers['user-agent'], req.params.campaign, req.params.list, req.params.subscription, link.id);
// In Mailtrain v1 we would do the URL expansion here based on merge tags. We don't do it here anymore. Instead, the URLs are expanded when message is sent out (in links.updateLinks)
return res.redirect(url);
} else {
log.error('Redirect', 'Unresolved URL: <%s>', req.url);
return notFound();
}
});
module.exports = router;

View file

@ -68,7 +68,6 @@ class CampaignSender {
}
const subscriptionGrouped = await subscriptions.getByEmail(contextHelpers.getAdminContext(), listId, email);
console.log(subscriptionGrouped);
const flds = this.listsFieldsGrouped.get(listId);
const campaign = this.campaign;
@ -119,7 +118,7 @@ class CampaignSender {
renderTags = true;
}
html = await links.updateLinks(campaign, list, subscriptionGrouped, html);
html = await links.updateLinks(campaign, list, subscriptionGrouped, mergeTags, html);
const attachments = this.attachments.slice();
// replace data: images with embedded attachments