2016-04-04 12:36:30 +00:00
'use strict' ;
let log = require ( 'npmlog' ) ;
2016-04-11 03:26:20 +00:00
let config = require ( 'config' ) ;
2016-04-04 12:36:30 +00:00
let db = require ( '../lib/db' ) ;
let tools = require ( '../lib/tools' ) ;
let mailer = require ( '../lib/mailer' ) ;
let campaigns = require ( '../lib/models/campaigns' ) ;
let segments = require ( '../lib/models/segments' ) ;
let lists = require ( '../lib/models/lists' ) ;
let fields = require ( '../lib/models/fields' ) ;
let settings = require ( '../lib/models/settings' ) ;
let links = require ( '../lib/models/links' ) ;
let shortid = require ( 'shortid' ) ;
let url = require ( 'url' ) ;
2016-04-26 16:07:07 +00:00
let htmlToText = require ( 'html-to-text' ) ;
let request = require ( 'request' ) ;
2016-05-25 20:58:17 +00:00
let caches = require ( '../lib/caches' ) ;
2016-08-10 17:45:29 +00:00
let libmime = require ( 'libmime' ) ;
2016-05-25 15:01:39 +00:00
2016-04-04 12:36:30 +00:00
function findUnsent ( callback ) {
2016-05-25 15:01:39 +00:00
let returnUnsent = ( row , campaign ) => {
db . getConnection ( ( err , connection ) => {
if ( err ) {
return callback ( err ) ;
}
let subscription = tools . convertKeys ( row ) ;
let query = 'INSERT INTO `campaign__' + campaign . id + '` (list, segment, subscription) VALUES(?, ?,?)' ;
connection . query ( query , [ campaign . list , campaign . segment , subscription . id ] , ( err , result ) => {
connection . release ( ) ;
if ( err ) {
if ( err . code === 'ER_DUP_ENTRY' ) {
// race condition, try next one
return findUnsent ( callback ) ;
}
return callback ( err ) ;
}
subscription . campaign = campaign . id ;
callback ( null , {
id : result . insertId ,
listId : campaign . list ,
campaignId : campaign . id ,
subscription
} ) ;
} ) ;
} ) ;
} ;
2016-06-03 10:15:33 +00:00
// get next subscriber from trigger queue
let checkQueued = ( ) => {
db . getConnection ( ( err , connection ) => {
if ( err ) {
return callback ( err ) ;
}
connection . query ( 'SELECT * FROM `queued` ORDER BY `created` ASC LIMIT 1' , ( err , rows ) => {
if ( err ) {
connection . release ( ) ;
return callback ( err ) ;
}
if ( ! rows || ! rows . length ) {
connection . release ( ) ;
return callback ( null , false ) ;
}
let queued = tools . convertKeys ( rows [ 0 ] ) ;
// delete queued element
connection . query ( 'DELETE FROM `queued` WHERE `campaign`=? AND `list`=? AND `subscriber`=? LIMIT 1' , [ queued . campaign , queued . list , queued . subscriber ] , err => {
if ( err ) {
connection . release ( ) ;
return callback ( err ) ;
}
// get campaign
connection . query ( 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `id`=? LIMIT 1' , [ queued . campaign ] , ( err , rows ) => {
if ( err ) {
connection . release ( ) ;
return callback ( err ) ;
}
if ( ! rows || ! rows . length ) {
connection . release ( ) ;
return callback ( null , false ) ;
}
let campaign = tools . convertKeys ( rows [ 0 ] ) ;
// get subscription
connection . query ( 'SELECT * FROM `subscription__' + queued . list + '` WHERE `id`=? AND `status`=1 LIMIT 1' , [ queued . subscriber ] , ( err , rows ) => {
connection . release ( ) ;
if ( err ) {
return callback ( err ) ;
}
if ( ! rows || ! rows . length ) {
return callback ( null , false ) ;
}
return returnUnsent ( rows [ 0 ] , campaign ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ;
2016-05-25 20:58:17 +00:00
if ( caches . cache . has ( 'sender queue' ) ) {
let cached = caches . shift ( 'sender queue' ) ;
2016-05-25 15:01:39 +00:00
return returnUnsent ( cached . row , cached . campaign ) ;
}
2016-04-04 12:36:30 +00:00
db . getConnection ( ( err , connection ) => {
if ( err ) {
return callback ( err ) ;
}
2016-04-30 15:19:48 +00:00
// Find "normal" campaigns. Ignore RSS and drip campaigns at this point
2016-05-03 11:04:46 +00:00
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1' ;
connection . query ( query , [ 2 , 1 , 3 ] , ( err , rows ) => {
2016-05-25 15:01:39 +00:00
connection . release ( ) ;
2016-04-04 12:36:30 +00:00
if ( err ) {
return callback ( err ) ;
}
if ( ! rows || ! rows . length ) {
2016-06-03 10:15:33 +00:00
return checkQueued ( ) ;
2016-04-04 12:36:30 +00:00
}
let campaign = tools . convertKeys ( rows [ 0 ] ) ;
let getSegmentQuery = ( segmentId , next ) => {
segmentId = Number ( segmentId ) ;
if ( ! segmentId ) {
return next ( null , {
where : '' ,
values : [ ]
} ) ;
}
2016-05-26 09:53:12 +00:00
2016-05-25 15:01:39 +00:00
segments . getQuery ( segmentId , 'subscription' , next ) ;
2016-04-04 12:36:30 +00:00
} ;
getSegmentQuery ( campaign . segment , ( err , queryData ) => {
if ( err ) {
return callback ( err ) ;
}
2016-05-25 15:01:39 +00:00
db . getConnection ( ( err , connection ) => {
if ( err ) {
return callback ( err ) ;
}
2016-04-29 18:47:23 +00:00
// TODO: Add support for localized sending time. In this case campaign messages are
// not sent before receiver's local time reaches defined time
// SELECT * FROM subscription__1 LEFT JOIN tzoffset ON tzoffset.tz=subscription__1.tz WHERE NOW() + INTERVAL IFNULL(`offset`,0) MINUTE >= localtime
2016-05-25 15:01:39 +00:00
let query ;
let values ;
// NOT IN
2016-05-26 09:53:12 +00:00
query = 'SELECT * FROM `subscription__' + campaign . list + '` AS subscription WHERE status=1 ' + ( queryData . where ? ' AND (' + queryData . where + ')' : '' ) + ' AND id NOT IN (SELECT subscription FROM `campaign__' + campaign . id + '` campaign WHERE campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id) LIMIT 150' ;
2016-05-25 15:01:39 +00:00
values = queryData . values . concat ( [ campaign . list , campaign . segment ] ) ;
2016-04-04 12:36:30 +00:00
2016-05-25 15:01:39 +00:00
// LEFT JOIN / IS NULL
2016-05-26 09:53:12 +00:00
//query = 'SELECT subscription.* FROM `subscription__' + campaign.list + '` AS subscription LEFT JOIN `campaign__' + campaign.id + '` AS campaign ON campaign.list = ? AND campaign.segment = ? AND campaign.subscription = subscription.id WHERE subscription.status=1 ' + (queryData.where ? 'AND (' + queryData.where + ') ' : '') + 'AND campaign.id IS NULL LIMIT 150';
2016-05-25 15:01:39 +00:00
//values = [campaign.list, campaign.segment].concat(queryData.values);
connection . query ( query , values , ( err , rows ) => {
2016-05-26 09:53:12 +00:00
2016-04-04 12:36:30 +00:00
if ( err ) {
connection . release ( ) ;
return callback ( err ) ;
}
if ( ! rows || ! rows . length ) {
// everything already processed for this campaign
2016-06-22 12:25:36 +00:00
connection . query ( 'UPDATE campaigns SET `status`=3, `status_change`=NOW() WHERE id=? LIMIT 1' , [ campaign . id ] , ( ) => {
2016-04-04 12:36:30 +00:00
connection . release ( ) ;
return callback ( null , false ) ;
} ) ;
2016-06-22 12:25:36 +00:00
return ;
2016-04-04 12:36:30 +00:00
}
2016-05-25 15:01:39 +00:00
connection . release ( ) ;
2016-04-04 12:36:30 +00:00
2016-05-25 15:01:39 +00:00
rows . forEach ( row => {
2016-05-25 20:58:17 +00:00
caches . push ( 'sender queue' , {
2016-05-25 15:01:39 +00:00
row ,
campaign
2016-04-04 12:36:30 +00:00
} ) ;
} ) ;
2016-05-25 15:01:39 +00:00
return findUnsent ( callback ) ;
2016-04-04 12:36:30 +00:00
} ) ;
2016-05-25 15:01:39 +00:00
} ) ;
2016-04-04 12:36:30 +00:00
} ) ;
} ) ;
} ) ;
}
function formatMessage ( message , callback ) {
campaigns . get ( message . campaignId , false , ( err , campaign ) => {
if ( err ) {
return callback ( err ) ;
}
if ( ! campaign ) {
return callback ( new Error ( 'Campaign not found' ) ) ;
}
lists . get ( message . listId , ( err , list ) => {
if ( err ) {
return callback ( err ) ;
}
if ( ! list ) {
return callback ( new Error ( 'List not found' ) ) ;
}
2016-04-11 03:26:20 +00:00
settings . list ( [ 'serviceUrl' , 'verpUse' , 'verpHostname' ] , ( err , configItems ) => {
2016-04-04 12:36:30 +00:00
if ( err ) {
return callback ( err ) ;
}
2016-04-11 03:26:20 +00:00
let useVerp = config . verp . enabled && configItems . verpUse && configItems . verpHostname ;
2016-04-04 12:36:30 +00:00
fields . list ( list . id , ( err , fieldList ) => {
if ( err ) {
return callback ( err ) ;
}
message . subscription . mergeTags = {
EMAIL : message . subscription . email ,
FIRST _NAME : message . subscription . firstName ,
LAST _NAME : message . subscription . lastName ,
FULL _NAME : [ ] . concat ( message . subscription . firstName || [ ] ) . concat ( message . subscription . lastName || [ ] ) . join ( ' ' )
} ;
2016-04-16 17:11:10 +00:00
let encryptionKeys = [ ] ;
2016-04-04 12:36:30 +00:00
fields . getRow ( fieldList , message . subscription , true , true ) . forEach ( field => {
if ( field . mergeTag ) {
message . subscription . mergeTags [ field . mergeTag ] = field . mergeValue || '' ;
}
2016-04-16 17:11:10 +00:00
if ( field . type === 'gpg' && field . value ) {
encryptionKeys . push ( field . value . trim ( ) ) ;
}
2016-04-04 12:36:30 +00:00
if ( field . options ) {
field . options . forEach ( subField => {
if ( subField . mergeTag ) {
2016-05-13 12:32:29 +00:00
message . subscription . mergeTags [ subField . mergeTag ] = subField . value && subField . mergeValue || '' ;
2016-04-04 12:36:30 +00:00
}
} ) ;
}
} ) ;
2016-04-26 16:07:07 +00:00
let renderAndSend = ( html , text , renderTags ) => {
links . updateLinks ( campaign , list , message . subscription , configItems . serviceUrl , html , ( err , html ) => {
if ( err ) {
return callback ( err ) ;
}
2016-04-04 12:36:30 +00:00
2016-04-26 16:07:07 +00:00
// replace data: images with embedded attachments
let attachments = [ ] ;
html = html . replace ( /(<img\b[^>]* src\s*=[\s"']*)(data:[^"'>\s]+)/gi , ( match , prefix , dataUri ) => {
let cid = shortid . generate ( ) + '-attachments@' + campaign . address . split ( '@' ) . pop ( ) ;
attachments . push ( {
path : dataUri ,
cid
} ) ;
return prefix + 'cid:' + cid ;
2016-04-04 12:36:30 +00:00
} ) ;
2016-04-26 16:07:07 +00:00
let campaignAddress = [ campaign . cid , list . cid , message . subscription . cid ] . join ( '.' ) ;
let renderedHtml = renderTags ? tools . formatMessage ( configItems . serviceUrl , campaign , list , message . subscription , html ) : html ;
let renderedText = ( text || '' ) . trim ( ) ? ( renderTags ? tools . formatMessage ( configItems . serviceUrl , campaign , list , message . subscription , text ) : text ) : htmlToText . fromString ( renderedHtml , {
wordwrap : 130
} ) ;
return callback ( null , {
from : {
name : campaign . from ,
address : campaign . address
} ,
xMailer : 'Mailtrain Mailer (+https://mailtrain.org)' ,
to : {
name : [ ] . concat ( message . subscription . firstName || [ ] ) . concat ( message . subscription . lastName || [ ] ) . join ( ' ' ) ,
address : message . subscription . email
} ,
sender : useVerp ? campaignAddress + '@' + configItems . verpHostname : false ,
envelope : useVerp ? {
from : campaignAddress + '@' + configItems . verpHostname ,
to : message . subscription . email
} : false ,
headers : {
'x-fbl' : campaignAddress ,
// custom header for SparkPost
'x-msys-api' : JSON . stringify ( {
2016-04-11 03:26:20 +00:00
campaign _id : campaignAddress
2016-04-26 16:07:07 +00:00
} ) ,
// custom header for SendGrid
'x-smtpapi' : JSON . stringify ( {
unique _args : {
campaign _id : campaignAddress
}
} ) ,
// custom header for Mailgun
'x-mailgun-variables' : JSON . stringify ( {
campaign _id : campaignAddress
} ) ,
'List-ID' : {
prepared : true ,
2016-08-10 17:45:29 +00:00
value : libmime . encodeWords ( list . name ) + ' <' + list . cid + '.' + ( url . parse ( configItems . serviceUrl ) . hostname || 'localhost' ) + '>'
2016-04-04 12:36:30 +00:00
}
2016-04-26 16:07:07 +00:00
} ,
list : {
unsubscribe : url . resolve ( configItems . serviceUrl , '/subscription/' + list . cid + '/unsubscribe/' + message . subscription . cid + '?auto=yes' )
} ,
subject : tools . formatMessage ( configItems . serviceUrl , campaign , list , message . subscription , campaign . subject ) ,
html : renderedHtml ,
text : renderedText ,
attachments ,
encryptionKeys
} ) ;
2016-04-04 12:36:30 +00:00
} ) ;
2016-04-26 16:07:07 +00:00
} ;
2016-04-30 15:19:48 +00:00
if ( campaign . sourceUrl ) {
2016-04-26 17:29:57 +00:00
let form = tools . getMessageLinks ( configItems . serviceUrl , campaign , list , message . subscription ) ;
Object . keys ( message . subscription . mergeTags ) . forEach ( key => {
form [ key ] = message . subscription . mergeTags [ key ] ;
} ) ;
2016-05-26 09:53:12 +00:00
2016-04-26 16:07:07 +00:00
request . post ( {
2016-04-30 15:19:48 +00:00
url : campaign . sourceUrl ,
2016-04-26 17:29:57 +00:00
form
2016-04-26 16:07:07 +00:00
} , ( err , httpResponse , body ) => {
if ( err ) {
return callback ( err ) ;
}
if ( httpResponse . statusCode !== 200 ) {
2016-04-30 15:19:48 +00:00
return callback ( new Error ( 'Received status code ' + httpResponse . statusCode + ' from ' + campaign . sourceUrl ) ) ;
2016-04-26 16:07:07 +00:00
}
renderAndSend ( body && body . toString ( ) , '' , false ) ;
} ) ;
} else {
2016-05-03 16:21:01 +00:00
renderAndSend ( campaign . htmlPrepared || campaign . html , campaign . text , true ) ;
2016-04-26 16:07:07 +00:00
}
2016-04-04 12:36:30 +00:00
} ) ;
} ) ;
} ) ;
} ) ;
}
let sendLoop = ( ) => {
mailer . getMailer ( err => {
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
return setTimeout ( sendLoop , 10 * 1000 ) ;
}
let getNext = ( ) => {
if ( ! mailer . transport . isIdle ( ) ) {
// only retrieve new messages if there are free slots in the mailer queue
return ;
}
// find an unsent message
findUnsent ( ( err , message ) => {
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
setTimeout ( getNext , 5 * 1000 ) ;
return ;
}
if ( ! message ) {
setTimeout ( getNext , 5 * 1000 ) ;
return ;
}
//log.verbose('Mail', 'Found new message to be delivered: %s', message.subscription.cid);
// format message to nodemailer message format
formatMessage ( message , ( err , mail ) => {
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
setTimeout ( getNext , 5 * 1000 ) ;
return ;
}
2016-05-25 15:01:39 +00:00
let tryCount = 0 ;
let trySend = ( ) => {
tryCount ++ ;
2016-04-04 12:36:30 +00:00
2016-05-25 15:01:39 +00:00
// send the message
mailer . transport . sendMail ( mail , ( err , info ) => {
2016-04-04 12:36:30 +00:00
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
2016-05-25 15:01:39 +00:00
if ( err . responseCode && err . responseCode >= 400 && err . responseCode < 500 && tryCount <= 5 ) {
// temporary error, try again
return setTimeout ( trySend , tryCount * 1000 ) ;
}
2016-04-04 12:36:30 +00:00
}
2016-05-25 15:01:39 +00:00
let status = err ? 2 : 1 ;
let response = err && ( err . response || err . message ) || info . response ;
let responseId = response . split ( /\s+/ ) . pop ( ) ;
db . getConnection ( ( err , connection ) => {
2016-04-04 12:36:30 +00:00
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
2016-05-25 15:01:39 +00:00
return ;
2016-04-04 12:36:30 +00:00
}
2016-05-25 15:01:39 +00:00
let query = 'UPDATE `campaigns` SET `delivered`=`delivered`+1 ' + ( status === 2 ? ', `bounced`=`bounced`+1 ' : '' ) + ' WHERE id=? LIMIT 1' ;
connection . query ( query , [ message . campaignId ] , err => {
2016-04-04 12:36:30 +00:00
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
}
2016-05-25 15:01:39 +00:00
let query = 'UPDATE `campaign__' + message . campaignId + '` SET status=?, response=?, response_id=?, updated=NOW() WHERE id=? LIMIT 1' ;
connection . query ( query , [ status , response , responseId , message . id ] , err => {
connection . release ( ) ;
if ( err ) {
log . error ( 'Mail' , err . stack ) ;
} else {
// log.verbose('Mail', 'Message sent and status updated for %s', message.subscription.cid);
}
} ) ;
2016-04-04 12:36:30 +00:00
} ) ;
} ) ;
} ) ;
2016-05-25 15:01:39 +00:00
} ;
setImmediate ( trySend ) ;
2016-07-05 16:31:57 +00:00
setImmediate ( ( ) => mailer . transport . checkThrottling ( getNext ) ) ;
2016-04-04 12:36:30 +00:00
} ) ;
} ) ;
} ;
2016-07-05 16:31:57 +00:00
mailer . transport . on ( 'idle' , ( ) => mailer . transport . checkThrottling ( getNext ) ) ;
2016-04-04 12:36:30 +00:00
} ) ;
} ;
2016-04-16 04:40:59 +00:00
module . exports = callback => {
sendLoop ( ) ;
setImmediate ( callback ) ;
} ;