2018-09-02 12:59:02 +00:00
'use strict' ;
2018-09-18 08:30:13 +00:00
const config = require ( 'config' ) ;
2018-09-27 19:32:35 +00:00
const log = require ( '../lib/log' ) ;
2018-09-02 12:59:02 +00:00
const mailers = require ( '../lib/mailers' ) ;
2019-06-29 21:19:56 +00:00
const messageSender = require ( '../lib/message-sender' ) ;
2019-05-25 19:57:11 +00:00
require ( '../lib/fork' ) ;
2018-09-02 12:59:02 +00:00
const workerId = Number . parseInt ( process . argv [ 2 ] ) ;
let running = false ;
2019-06-25 05:18:06 +00:00
async function processCampaignMessages ( campaignId , messages ) {
2018-09-02 12:59:02 +00:00
if ( running ) {
log . error ( 'Senders' , ` Worker ${ workerId } assigned work while working ` ) ;
return ;
}
running = true ;
2019-06-30 08:47:09 +00:00
const cs = new messageSender . MessageSender ( ) ;
2019-06-25 05:18:06 +00:00
await cs . initByCampaignId ( campaignId ) ;
2018-09-18 08:30:13 +00:00
2019-06-29 21:19:56 +00:00
let withErrors = false ;
2019-06-30 08:47:09 +00:00
for ( const msg of messages ) {
2018-09-18 08:30:13 +00:00
try {
2019-06-30 08:47:09 +00:00
await cs . sendRegularMessage ( msg . listId , msg . email ) ;
2018-11-20 22:41:10 +00:00
2019-06-30 08:47:09 +00:00
log . verbose ( 'Senders' , 'Message sent and status updated for %s:%s' , msg . listId , msg . email ) ;
2019-06-25 05:18:06 +00:00
} catch ( err ) {
2019-06-29 21:19:56 +00:00
if ( err instanceof mailers . SendConfigurationError ) {
2019-06-30 08:47:09 +00:00
log . error ( 'Senders' , ` Sending message to ${ msg . listId } : ${ msg . email } failed with error: ${ err . message } . Will retry the message if within retention interval. ` ) ;
2019-06-29 21:19:56 +00:00
withErrors = true ;
break ;
} else {
2019-06-30 08:47:09 +00:00
log . error ( 'Senders' , ` Sending message to ${ msg . listId } : ${ msg . email } failed with error: ${ err . message } . ` ) ;
2019-06-29 21:19:56 +00:00
log . verbose ( err . stack ) ;
}
2019-06-25 05:18:06 +00:00
}
}
running = false ;
2019-06-29 21:19:56 +00:00
sendToMaster ( 'messages-processed' , { withErrors } ) ;
2019-06-25 05:18:06 +00:00
}
async function processQueuedMessages ( sendConfigurationId , messages ) {
if ( running ) {
log . error ( 'Senders' , ` Worker ${ workerId } assigned work while working ` ) ;
return ;
}
running = true ;
2018-11-20 22:41:10 +00:00
2019-06-29 21:19:56 +00:00
let withErrors = false ;
2019-06-30 08:47:09 +00:00
for ( const msg of messages ) {
const queuedMessage = msg . queuedMessage ;
const msgData = queuedMessage . data ;
let target = '' ;
if ( msgData . listId && msgData . subscriptionId ) {
target = ` ${ msgData . listId } : ${ msgData . subscriptionId } ` ;
} else if ( msgData . to ) {
if ( msgData . to . name && msgData . to . address ) {
target = ` ${ msgData . to . name } < ${ msgData . to . address } > ` ;
} else if ( msgData . to . address ) {
target = msgData . to . address ;
} else {
target = msgData . to . toString ( ) ;
}
}
2019-06-25 05:18:06 +00:00
try {
2019-06-29 21:19:56 +00:00
await messageSender . sendQueuedMessage ( queuedMessage ) ;
2019-06-30 08:47:09 +00:00
log . verbose ( 'Senders' , ` Message sent and status updated for ${ target } ` ) ;
2018-09-18 08:30:13 +00:00
} catch ( err ) {
2019-06-29 21:19:56 +00:00
if ( err instanceof mailers . SendConfigurationError ) {
2019-06-30 08:47:09 +00:00
log . error ( 'Senders' , ` Sending message to ${ target } failed with error: ${ err . message } . Will retry the message if within retention interval. ` ) ;
2019-06-29 21:19:56 +00:00
withErrors = true ;
break ;
} else {
2019-06-30 08:47:09 +00:00
log . error ( 'Senders' , ` Sending message to ${ target } failed with error: ${ err . message } . Dropping the message. ` ) ;
2019-06-29 21:19:56 +00:00
log . verbose ( err . stack ) ;
2019-06-30 08:47:09 +00:00
try {
await messageSender . dropQueuedMessage ( queuedMessage ) ;
} catch ( err ) {
log . error ( err . stack ) ;
}
2019-06-29 21:19:56 +00:00
}
2018-09-18 08:30:13 +00:00
}
}
2018-09-02 12:59:02 +00:00
running = false ;
2018-09-09 22:55:44 +00:00
2019-06-29 21:19:56 +00:00
sendToMaster ( 'messages-processed' , { withErrors } ) ;
2018-09-02 12:59:02 +00:00
}
2019-06-29 21:19:56 +00:00
function sendToMaster ( msgType , data ) {
2018-09-02 12:59:02 +00:00
process . send ( {
2019-06-29 21:19:56 +00:00
type : msgType ,
data
2018-09-02 12:59:02 +00:00
} ) ;
}
process . on ( 'message' , msg => {
if ( msg ) {
const type = msg . type ;
2018-09-09 22:55:44 +00:00
if ( type === 'reload-config' ) {
2018-09-02 12:59:02 +00:00
mailers . invalidateMailer ( msg . data . sendConfigurationId ) ;
2019-06-25 05:18:06 +00:00
} else if ( type === 'process-campaign-messages' ) {
2018-09-09 22:55:44 +00:00
// noinspection JSIgnoredPromiseFromCall
2019-06-25 05:18:06 +00:00
processCampaignMessages ( msg . data . campaignId , msg . data . messages )
2018-09-02 12:59:02 +00:00
2019-06-25 05:18:06 +00:00
} else if ( type === 'process-queued-messages' ) {
// noinspection JSIgnoredPromiseFromCall
processQueuedMessages ( msg . data . sendConfigurationId , msg . data . messages )
}
2018-09-02 12:59:02 +00:00
}
} ) ;
2018-12-21 18:09:18 +00:00
if ( config . title ) {
process . title = config . title + ': sender/worker ' + workerId ;
}
2018-09-02 12:59:02 +00:00
sendToMaster ( 'worker-started' ) ;