Initial import
This commit is contained in:
commit
54fa30701e
278 changed files with 37868 additions and 0 deletions
237
services/importer.js
Normal file
237
services/importer.js
Normal file
|
|
@ -0,0 +1,237 @@
|
|||
'use strict';
|
||||
|
||||
let log = require('npmlog');
|
||||
|
||||
let db = require('../lib/db');
|
||||
let tools = require('../lib/tools');
|
||||
|
||||
let fields = require('../lib/models/fields');
|
||||
let subscriptions = require('../lib/models/subscriptions');
|
||||
let fs = require('fs');
|
||||
let csvparse = require('csv-parse');
|
||||
|
||||
function findUnprocessed(callback) {
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1';
|
||||
connection.query(query, (err, rows) => {
|
||||
if (err) {
|
||||
connection.release();
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (!rows || !rows.length) {
|
||||
connection.release();
|
||||
return callback(null, false);
|
||||
}
|
||||
|
||||
let importer = rows[0];
|
||||
|
||||
let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1';
|
||||
connection.query(query, [importer.id], (err, result) => {
|
||||
connection.release();
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (!result.affectedRows) {
|
||||
// check next one
|
||||
return findUnprocessed(callback);
|
||||
}
|
||||
|
||||
let importer = tools.convertKeys(rows[0]);
|
||||
try {
|
||||
importer.mapping = JSON.parse(importer.mapping);
|
||||
} catch (E) {
|
||||
importer.mapping = {
|
||||
columns: [],
|
||||
mapping: {}
|
||||
};
|
||||
}
|
||||
|
||||
return callback(null, importer);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function processImport(data, callback) {
|
||||
let parser = csvparse({
|
||||
comment: '#',
|
||||
delimiter: data.delimiter
|
||||
});
|
||||
|
||||
let listId = data.list;
|
||||
|
||||
fields.list(data.list, (err, fieldList) => {
|
||||
if (err && !fieldList) {
|
||||
fieldList = [];
|
||||
}
|
||||
|
||||
let firstRow;
|
||||
let finished = false;
|
||||
let inputStream = fs.createReadStream(data.path);
|
||||
let fieldTypes = {};
|
||||
|
||||
fieldList.forEach(field => {
|
||||
if (field.column) {
|
||||
fieldTypes[field.column] = field.type;
|
||||
}
|
||||
if (field.options) {
|
||||
field.options.forEach(subField => {
|
||||
if (subField.column) {
|
||||
fieldTypes[subField.column] = subField.type;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
inputStream.on('error', err => {
|
||||
if (finished) {
|
||||
return;
|
||||
}
|
||||
log.error('Import', err.stack);
|
||||
finished = true;
|
||||
return callback(err);
|
||||
});
|
||||
|
||||
parser.on('error', err => {
|
||||
if (finished) {
|
||||
return;
|
||||
}
|
||||
log.error('Import', err.stack);
|
||||
finished = true;
|
||||
return callback(err);
|
||||
});
|
||||
|
||||
let processing = false;
|
||||
let processRows = () => {
|
||||
let record = parser.read();
|
||||
if (record === null) {
|
||||
processing = false;
|
||||
return;
|
||||
}
|
||||
processing = true;
|
||||
|
||||
if (!firstRow) {
|
||||
firstRow = record;
|
||||
return setImmediate(processRows);
|
||||
}
|
||||
|
||||
let entry = {};
|
||||
Object.keys(data.mapping.mapping || {}).forEach(key => {
|
||||
// TODO: process all data types
|
||||
if (fieldTypes[key] === 'option') {
|
||||
entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0;
|
||||
} else if (fieldTypes[key] === 'number') {
|
||||
entry[key] = Number(record[data.mapping.mapping[key]]) || 0;
|
||||
} else {
|
||||
entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null;
|
||||
}
|
||||
});
|
||||
|
||||
if (!entry.email) {
|
||||
log.verbose('Import', 'Failed processing row, email missing');
|
||||
return setImmediate(processRows);
|
||||
}
|
||||
|
||||
tools.validateEmail(entry.email, true, err => {
|
||||
if (err) {
|
||||
log.verbose('Import', 'Failed processing row %s: %s', entry.email, err.message);
|
||||
return setImmediate(processRows);
|
||||
}
|
||||
|
||||
subscriptions.insert(listId, {
|
||||
imported: data.id,
|
||||
status: data.type
|
||||
}, entry, (err, entryId) => {
|
||||
if (err) {
|
||||
// ignore
|
||||
log.error('Import', err.stack);
|
||||
} else if (entryId) {
|
||||
//log.verbose('Import', 'Inserted %s as %s', entry.email, entryId);
|
||||
}
|
||||
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
log.error('Import', err.stack);
|
||||
return setImmediate(processRows);
|
||||
}
|
||||
|
||||
let query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1';
|
||||
connection.query(query, [data.id], () => {
|
||||
connection.release();
|
||||
return setImmediate(processRows);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
};
|
||||
|
||||
parser.on('readable', () => {
|
||||
if (finished || processing) {
|
||||
return;
|
||||
}
|
||||
processRows();
|
||||
});
|
||||
|
||||
parser.on('finish', () => {
|
||||
if (finished) {
|
||||
return;
|
||||
}
|
||||
finished = true;
|
||||
callback(null, true);
|
||||
});
|
||||
|
||||
inputStream.pipe(parser);
|
||||
});
|
||||
}
|
||||
|
||||
let importLoop = () => {
|
||||
let getNext = () => {
|
||||
// find an unsent message
|
||||
findUnprocessed((err, data) => {
|
||||
if (err) {
|
||||
log.error('Import', err.stack);
|
||||
setTimeout(getNext, 5 * 1000);
|
||||
return;
|
||||
}
|
||||
if (!data) {
|
||||
setTimeout(getNext, 5 * 1000);
|
||||
return;
|
||||
}
|
||||
|
||||
processImport(data, err => {
|
||||
let failed = null;
|
||||
if (err) {
|
||||
if (err.code === 'ENOENT') {
|
||||
failed = 'Could not access import file';
|
||||
} else {
|
||||
failed = err.message || err;
|
||||
}
|
||||
}
|
||||
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
log.error('Import', err.stack);
|
||||
return setTimeout(getNext, 5 * 1000);
|
||||
}
|
||||
|
||||
let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1';
|
||||
|
||||
connection.query(query, [!failed ? 3 : 4, failed, data.id], () => {
|
||||
connection.release();
|
||||
|
||||
getNext();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
getNext();
|
||||
};
|
||||
|
||||
importLoop();
|
||||
286
services/sender.js
Normal file
286
services/sender.js
Normal file
|
|
@ -0,0 +1,286 @@
|
|||
'use strict';
|
||||
|
||||
let log = require('npmlog');
|
||||
|
||||
let db = require('../lib/db');
|
||||
let tools = require('../lib/tools');
|
||||
let mailer = require('../lib/mailer');
|
||||
let campaigns = require('../lib/models/campaigns');
|
||||
let segments = require('../lib/models/segments');
|
||||
let lists = require('../lib/models/lists');
|
||||
let fields = require('../lib/models/fields');
|
||||
let settings = require('../lib/models/settings');
|
||||
let links = require('../lib/models/links');
|
||||
let shortid = require('shortid');
|
||||
let url = require('url');
|
||||
|
||||
function findUnsent(callback) {
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
let query = 'SELECT id, list, segment FROM campaigns WHERE status=? LIMIT 1';
|
||||
connection.query(query, [2], (err, rows) => {
|
||||
if (err) {
|
||||
connection.release();
|
||||
return callback(err);
|
||||
}
|
||||
if (!rows || !rows.length) {
|
||||
connection.release();
|
||||
return callback(null, false);
|
||||
}
|
||||
|
||||
let campaign = tools.convertKeys(rows[0]);
|
||||
|
||||
let getSegmentQuery = (segmentId, next) => {
|
||||
segmentId = Number(segmentId);
|
||||
if (!segmentId) {
|
||||
return next(null, {
|
||||
where: '',
|
||||
values: []
|
||||
});
|
||||
}
|
||||
segments.getQuery(segmentId, next);
|
||||
};
|
||||
|
||||
getSegmentQuery(campaign.segment, (err, queryData) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
let tryNext = () => {
|
||||
let query = 'SELECT * FROM `subscription__' + campaign.list + '` subscription WHERE status=1 ' + (queryData.where ? ' AND (' + queryData.where + ')' : '') + ' AND NOT EXISTS (SELECT 1 FROM `campaign__' + campaign.id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 1';
|
||||
|
||||
connection.query(query, queryData.values.concat([campaign.list, campaign.segment]), (err, rows) => {
|
||||
if (err) {
|
||||
connection.release();
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (!rows || !rows.length) {
|
||||
// everything already processed for this campaign
|
||||
return connection.query('UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? LIMIT 1', [campaign.id], () => {
|
||||
connection.release();
|
||||
return callback(null, false);
|
||||
});
|
||||
}
|
||||
|
||||
let subscription = tools.convertKeys(rows[0]);
|
||||
let query = 'INSERT INTO `campaign__' + campaign.id + '` (list, segment, subscription) VALUES(?, ?,?)';
|
||||
connection.query(query, [campaign.list, campaign.segment, subscription.id], (err, result) => {
|
||||
if (err) {
|
||||
if (err.code === 'ER_DUP_ENTRY') {
|
||||
// race condition, try next one
|
||||
return tryNext();
|
||||
}
|
||||
connection.release();
|
||||
return callback(err);
|
||||
}
|
||||
connection.release();
|
||||
subscription.campaign = campaign.id;
|
||||
callback(null, {
|
||||
id: result.insertId,
|
||||
listId: campaign.list,
|
||||
campaignId: campaign.id,
|
||||
subscription
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
tryNext();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function formatMessage(message, callback) {
|
||||
campaigns.get(message.campaignId, false, (err, campaign) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (!campaign) {
|
||||
return callback(new Error('Campaign not found'));
|
||||
}
|
||||
|
||||
lists.get(message.listId, (err, list) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (!list) {
|
||||
return callback(new Error('List not found'));
|
||||
}
|
||||
|
||||
settings.get('serviceUrl', (err, serviceUrl) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
fields.list(list.id, (err, fieldList) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
message.subscription.mergeTags = {
|
||||
EMAIL: message.subscription.email,
|
||||
FIRST_NAME: message.subscription.firstName,
|
||||
LAST_NAME: message.subscription.lastName,
|
||||
FULL_NAME: [].concat(message.subscription.firstName || []).concat(message.subscription.lastName || []).join(' ')
|
||||
};
|
||||
|
||||
fields.getRow(fieldList, message.subscription, true, true).forEach(field => {
|
||||
if (field.mergeTag) {
|
||||
message.subscription.mergeTags[field.mergeTag] = field.mergeValue || '';
|
||||
}
|
||||
if (field.options) {
|
||||
field.options.forEach(subField => {
|
||||
if (subField.mergeTag) {
|
||||
message.subscription.mergeTags[subField.mergeTag] = subField.mergeValue || '';
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
links.updateLinks(campaign, list, message.subscription, serviceUrl, campaign.html, (err, html) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// replace data: images with embedded attachments
|
||||
let attachments = [];
|
||||
html = html.replace(/(<img\b[^>]* src\s*=[\s"']*)(data:[^"'>\s]+)/gi, (match, prefix, dataUri) => {
|
||||
let cid = shortid.generate() + '-attachments@' + campaign.address.split('@').pop();
|
||||
attachments.push({
|
||||
path: dataUri,
|
||||
cid
|
||||
});
|
||||
return prefix + 'cid:' + cid;
|
||||
});
|
||||
|
||||
return callback(null, {
|
||||
from: {
|
||||
name: campaign.from,
|
||||
address: campaign.address
|
||||
},
|
||||
xMailer: 'Mailtrain Mailer (+http://mailtrain.org)',
|
||||
to: {
|
||||
name: [].concat(message.subscription.firstName || []).concat(message.subscription.lastName || []).join(' '),
|
||||
address: message.subscription.email
|
||||
},
|
||||
headers: {
|
||||
'x-fbl': [campaign.cid, list.cid, message.subscription.cid].join('.'),
|
||||
// custom header for SparkPost
|
||||
'x-msys-api': JSON.stringify({
|
||||
campaign_id: [campaign.cid, list.cid, message.subscription.cid].join('.')
|
||||
}),
|
||||
// custom header for SendGrid
|
||||
'x-smtpapi': JSON.stringify({
|
||||
unique_args: {
|
||||
campaign_id: [campaign.cid, list.cid, message.subscription.cid].join('.')
|
||||
}
|
||||
}),
|
||||
// custom header for Mailgun
|
||||
'x-mailgun-variables': JSON.stringify({
|
||||
campaign_id: [campaign.cid, list.cid, message.subscription.cid].join('.')
|
||||
}),
|
||||
'List-ID': {
|
||||
prepared: true,
|
||||
value: '"' + list.name.replace(/[^a-z0-9\s'.,\-]/g, '').trim() + '" <' + list.cid + '.' + (url.parse(serviceUrl).hostname || 'localhost') + '>'
|
||||
}
|
||||
},
|
||||
list: {
|
||||
unsubscribe: url.resolve(serviceUrl, '/subscription/' + list.cid + '/unsubscribe/' + message.subscription.cid + '?auto=yes')
|
||||
},
|
||||
subject: tools.formatMessage(serviceUrl, campaign, list, message.subscription, campaign.subject),
|
||||
html: tools.formatMessage(serviceUrl, campaign, list, message.subscription, html),
|
||||
text: tools.formatMessage(serviceUrl, campaign, list, message.subscription, campaign.text),
|
||||
|
||||
attachments
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let sendLoop = () => {
|
||||
mailer.getMailer(err => {
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
return setTimeout(sendLoop, 10 * 1000);
|
||||
}
|
||||
|
||||
let getNext = () => {
|
||||
if (!mailer.transport.isIdle()) {
|
||||
// only retrieve new messages if there are free slots in the mailer queue
|
||||
return;
|
||||
}
|
||||
|
||||
// find an unsent message
|
||||
findUnsent((err, message) => {
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
setTimeout(getNext, 5 * 1000);
|
||||
return;
|
||||
}
|
||||
if (!message) {
|
||||
setTimeout(getNext, 5 * 1000);
|
||||
return;
|
||||
}
|
||||
|
||||
//log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid);
|
||||
|
||||
// format message to nodemailer message format
|
||||
formatMessage(message, (err, mail) => {
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
setTimeout(getNext, 5 * 1000);
|
||||
return;
|
||||
}
|
||||
|
||||
// send the message
|
||||
mailer.transport.sendMail(mail, (err, info) => {
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
}
|
||||
|
||||
let status = err ? 2 : 1;
|
||||
let response = err && (err.response || err.message) || info.response;
|
||||
|
||||
let responseId = response.split(/\s+/).pop();
|
||||
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
return;
|
||||
}
|
||||
|
||||
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + (status === 2 ? ', `bounced`=`bounced`+1 ' : '') + ' WHERE id=? LIMIT 1';
|
||||
connection.query(query, [message.campaignId], err => {
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
}
|
||||
|
||||
let query = 'UPDATE `campaign__' + message.campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1';
|
||||
connection.query(query, [status, response, responseId, message.id], err => {
|
||||
connection.release();
|
||||
if (err) {
|
||||
log.error('Mail', err.stack);
|
||||
} else {
|
||||
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
setImmediate(getNext);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
mailer.transport.on('idle', getNext);
|
||||
});
|
||||
};
|
||||
|
||||
sendLoop();
|
||||
104
services/testserver.js
Normal file
104
services/testserver.js
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
'use strict';
|
||||
|
||||
let log = require('npmlog');
|
||||
let config = require('config');
|
||||
let crypto = require('crypto');
|
||||
|
||||
// Replace '../lib/smtp-server' with 'smtp-server' when running this script outside this directory
|
||||
let SMTPServer = require('smtp-server').SMTPServer;
|
||||
|
||||
// Setup server
|
||||
let server = new SMTPServer({
|
||||
|
||||
// log to console
|
||||
logger: false,
|
||||
|
||||
// not required but nice-to-have
|
||||
banner: 'Welcome to My Awesome SMTP Server',
|
||||
|
||||
// disable STARTTLS to allow authentication in clear text mode
|
||||
disabledCommands: ['STARTTLS'],
|
||||
|
||||
// By default only PLAIN and LOGIN are enabled
|
||||
authMethods: ['PLAIN', 'LOGIN', 'CRAM-MD5'],
|
||||
|
||||
// Accept messages up to 10 MB
|
||||
size: 10 * 1024 * 1024,
|
||||
|
||||
// Setup authentication
|
||||
// Allow only users with username 'testuser' and password 'testpass'
|
||||
onAuth: (auth, session, callback) => {
|
||||
let username = 'testuser';
|
||||
let password = 'testpass';
|
||||
|
||||
// check username and password
|
||||
if (auth.username === username &&
|
||||
(
|
||||
auth.method === 'CRAM-MD5' ?
|
||||
auth.validatePassword(password) : // if cram-md5, validate challenge response
|
||||
auth.password === password // for other methods match plaintext passwords
|
||||
)
|
||||
) {
|
||||
return callback(null, {
|
||||
user: 'userdata' // value could be an user id, or an user object etc. This value can be accessed from session.user afterwards
|
||||
});
|
||||
}
|
||||
|
||||
return callback(new Error('Authentication failed'));
|
||||
},
|
||||
|
||||
// Validate MAIL FROM envelope address. Example allows all addresses that do not start with 'deny'
|
||||
// If this method is not set, all addresses are allowed
|
||||
onMailFrom: (address, session, callback) => {
|
||||
if (/^deny/i.test(address.address)) {
|
||||
return callback(new Error('Not accepted'));
|
||||
}
|
||||
callback();
|
||||
},
|
||||
|
||||
// Validate RCPT TO envelope address. Example allows all addresses that do not start with 'deny'
|
||||
// If this method is not set, all addresses are allowed
|
||||
onRcptTo: (address, session, callback) => {
|
||||
let err;
|
||||
|
||||
if (/^deny/i.test(address.address)) {
|
||||
return callback(new Error('Not accepted'));
|
||||
}
|
||||
|
||||
// Reject messages larger than 100 bytes to an over-quota user
|
||||
if (/^full/i.test(address.address) && Number(session.envelope.mailFrom.args.SIZE) > 100) {
|
||||
err = new Error('Insufficient channel storage: ' + address.address);
|
||||
err.responseCode = 452;
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback();
|
||||
},
|
||||
|
||||
// Handle message stream
|
||||
onData: (stream, session, callback) => {
|
||||
let hash = crypto.createHash('md5');
|
||||
stream.on('data', chunk => {
|
||||
hash.update(chunk);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
let err;
|
||||
if (stream.sizeExceeded) {
|
||||
err = new Error('Error: message exceeds fixed maximum message size 10 MB');
|
||||
err.responseCode = 552;
|
||||
return callback(err);
|
||||
}
|
||||
callback(null, 'Message queued as ' + hash.digest('hex')); // accept the message once the stream is ended
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
server.on('error', err => {
|
||||
log.error('TESTSERV', err.stack);
|
||||
});
|
||||
|
||||
if (config.testserver.enabled) {
|
||||
server.listen(config.testserver.port, () => {
|
||||
log.info('TESTSERV', 'Server listening on port %s', config.testserver.port);
|
||||
});
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue