From 1448d9e9147e005f1acf95f0ef983f3237f9edbc Mon Sep 17 00:00:00 2001 From: Tomas Bures Date: Thu, 27 Sep 2018 21:32:35 +0200 Subject: [PATCH] Bugfixes in sending campaigns --- app-builder.js | 2 +- index.js | 13 ++++++++++--- lib/campaign-sender.js | 6 +++--- lib/dbcheck.js | 16 ++++++++-------- lib/executor.js | 2 +- lib/feedcheck.js | 2 +- lib/importer.js | 2 +- lib/log.js | 8 ++++++++ lib/mailers.js | 2 +- lib/passport.js | 2 +- lib/privilege-helpers.js | 2 +- lib/report-processor.js | 2 +- lib/senders.js | 8 +++----- lib/subscription-mail-helpers.js | 2 +- lib/translate.js | 2 +- models/campaigns.js | 2 +- models/fields.js | 22 +++++++++++++++++----- models/links.js | 2 +- models/shares.js | 2 +- routes/api.js | 2 +- routes/links.js | 2 +- routes/subscription.js | 2 +- routes/webhooks.js | 2 +- services/executor.js | 2 +- services/feedcheck.js | 2 +- services/importer.js | 2 +- services/postfix-bounce-server.js | 2 +- services/sender-master.js | 22 ++++++++++++++++++---- services/sender-worker.js | 3 ++- services/test-server.js | 2 +- services/triggers.js | 2 +- services/tzupdate.js | 2 +- services/verp-server.js | 2 +- workers/reports/report-processor.js | 2 +- 34 files changed, 95 insertions(+), 55 deletions(-) create mode 100644 lib/log.js diff --git a/app-builder.js b/app-builder.js index 100681a5..0aa1c5d2 100644 --- a/app-builder.js +++ b/app-builder.js @@ -1,7 +1,7 @@ 'use strict'; const config = require('config'); -const log = require('npmlog'); +const log = require('./lib/log'); const express = require('express'); const bodyParser = require('body-parser'); diff --git a/index.js b/index.js index 5c45808c..817027be 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,7 @@ 'use strict'; const config = require('config'); -const log = require('npmlog'); +const log = require('./lib/log'); const appBuilder = require('./app-builder'); const http = require('http'); const triggers = require('./services/triggers'); @@ -29,8 +29,6 @@ if (config.title) { process.title = config.title; } -log.level = config.log.level; - function startHTTPServer(appType, appName, port, callback) { const app = appBuilder.createApp(appType); @@ -82,6 +80,15 @@ dbcheck(err => { // Check if database needs upgrading before starting the server .then(() => shares.regenerateRoleNamesTable()) .then(() => shares.rebuildPermissions()) +/* + .then(() => + testServer(() => { + senders.spawn(() => { + }); + }) + ); +*/ + .then(() => executor.spawn(() => { testServer(() => { diff --git a/lib/campaign-sender.js b/lib/campaign-sender.js index bca46ac8..e144a8da 100644 --- a/lib/campaign-sender.js +++ b/lib/campaign-sender.js @@ -148,7 +148,7 @@ class CampaignSender { return; } - const list = this.listsById.get(list.id); + const list = this.listsById.get(listId); const subscriptionGrouped = await subscriptions.getByEmail(contextHelpers.getAdminContext(), list.id, email); const flds = this.listsFieldsGrouped.get(listId); const campaign = this.campaign; @@ -171,7 +171,7 @@ class CampaignSender { if (!list.listunsubscribe_disabled) { listUnsubscribe = campaign.unsubscribe_url ? tools.formatMessage(campaign, list, subscriptionGrouped, mergeTags, campaign.unsubscribe_url) - : getPublicUrl('/subscription/' + list.cid + '/unsubscribe/' + subscriptionGrouped.subscription.cid); + : getPublicUrl('/subscription/' + list.cid + '/unsubscribe/' + subscriptionGrouped.cid); } const mailer = await mailers.getOrCreateMailer(sendConfiguration.id); @@ -257,7 +257,7 @@ class CampaignSender { await knex('campaign_messages').insert({ campaign: this.campaign.id, list: listId, - subscriptions: subscriptionGrouped.id, + subscription: subscriptionGrouped.id, send_configuration: sendConfiguration.id, status, response, diff --git a/lib/dbcheck.js b/lib/dbcheck.js index 8581ce9f..bfd5d6f5 100644 --- a/lib/dbcheck.js +++ b/lib/dbcheck.js @@ -4,20 +4,20 @@ This module handles Mailtrain database initialization and upgrades */ -let config = require('config'); -let mysql = require('mysql2'); -let log = require('npmlog'); -let fs = require('fs'); -let pathlib = require('path'); -let Handlebars = require('handlebars'); +const config = require('config'); +const mysql = require('mysql2'); +const log = require('./log'); +const fs = require('fs'); +const pathlib = require('path'); +const Handlebars = require('handlebars'); const highestLegacySchemaVersion = 29; -let mysqlConfig = { +const mysqlConfig = { multipleStatements: true }; Object.keys(config.mysql).forEach(key => mysqlConfig[key] = config.mysql[key]); -let db = mysql.createPool(mysqlConfig); +const db = mysql.createPool(mysqlConfig); function listTables(callback) { db.getConnection((err, connection) => { diff --git a/lib/executor.js b/lib/executor.js index 894c1ccd..1a40bd15 100644 --- a/lib/executor.js +++ b/lib/executor.js @@ -1,7 +1,7 @@ 'use strict'; const fork = require('child_process').fork; -const log = require('npmlog'); +const log = require('./log'); const path = require('path'); const requestCallbacks = {}; diff --git a/lib/feedcheck.js b/lib/feedcheck.js index 57a24a90..f202b6c3 100644 --- a/lib/feedcheck.js +++ b/lib/feedcheck.js @@ -1,7 +1,7 @@ 'use strict'; const fork = require('child_process').fork; -const log = require('npmlog'); +const log = require('./log'); const path = require('path'); let feedcheckProcess; diff --git a/lib/importer.js b/lib/importer.js index ef59e421..8ba5f470 100644 --- a/lib/importer.js +++ b/lib/importer.js @@ -2,7 +2,7 @@ const knex = require('./knex'); const fork = require('child_process').fork; -const log = require('npmlog'); +const log = require('./log'); const path = require('path'); const {ImportStatus, RunStatus} = require('../shared/imports'); diff --git a/lib/log.js b/lib/log.js new file mode 100644 index 00000000..9f4e02be --- /dev/null +++ b/lib/log.js @@ -0,0 +1,8 @@ +'use strict'; + +const config = require('config'); +const log = require('npmlog'); + +log.level = config.log.level; + +module.exports = log; \ No newline at end of file diff --git a/lib/mailers.js b/lib/mailers.js index f619cd0d..aa1538f7 100644 --- a/lib/mailers.js +++ b/lib/mailers.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('./log'); const config = require('config'); const Handlebars = require('handlebars'); diff --git a/lib/passport.js b/lib/passport.js index bb499995..a9213e85 100644 --- a/lib/passport.js +++ b/lib/passport.js @@ -1,7 +1,7 @@ 'use strict'; const config = require('config'); -const log = require('npmlog'); +const log = require('./log'); const _ = require('./translate')._; const util = require('util'); diff --git a/lib/privilege-helpers.js b/lib/privilege-helpers.js index 31bb186b..79b0d388 100644 --- a/lib/privilege-helpers.js +++ b/lib/privilege-helpers.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('./log'); const config = require('config'); const fs = require('fs'); diff --git a/lib/report-processor.js b/lib/report-processor.js index 67bda8f2..64db10db 100644 --- a/lib/report-processor.js +++ b/lib/report-processor.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('./log'); const reports = require('../models/reports'); const executor = require('./executor'); const contextHelpers = require('../lib/context-helpers'); diff --git a/lib/senders.js b/lib/senders.js index 94891023..a4000700 100644 --- a/lib/senders.js +++ b/lib/senders.js @@ -1,7 +1,7 @@ 'use strict'; const fork = require('child_process').fork; -const log = require('npmlog'); +const log = require('./log'); const path = require('path'); const knex = require('../lib/knex'); const {CampaignStatus} = require('../shared/campaigns'); @@ -12,10 +12,8 @@ let senderProcess; function spawn(callback) { log.verbose('Senders', 'Spawning master sender process'); - knex.transaction(async tx => { - await tx('campaigns').where('status', CampaignStatus.SENDING).update({status: CampaignStatus.SCHEDULED}); - - }).then(() => { + knex('campaigns').where('status', CampaignStatus.SENDING).update({status: CampaignStatus.SCHEDULED}) + .then(() => { senderProcess = fork(path.join(__dirname, '..', 'services', 'sender-master.js'), [], { cwd: path.join(__dirname, '..'), env: {NODE_ENV: process.env.NODE_ENV} diff --git a/lib/subscription-mail-helpers.js b/lib/subscription-mail-helpers.js index 9ba49825..45856cb7 100644 --- a/lib/subscription-mail-helpers.js +++ b/lib/subscription-mail-helpers.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('./log'); const fields = require('../models/fields'); const settings = require('../models/settings'); const {getTrustedUrl} = require('./urls'); diff --git a/lib/translate.js b/lib/translate.js index 9abd9a6f..b881d664 100644 --- a/lib/translate.js +++ b/lib/translate.js @@ -6,7 +6,7 @@ const Gettext = require('node-gettext'); const gt = new Gettext(); const fs = require('fs'); const path = require('path'); -const log = require('npmlog'); +const log = require('./log'); const gettextParser = require('gettext-parser'); const fakelang = require('./fakelang'); diff --git a/models/campaigns.js b/models/campaigns.js index f9096128..cbfa3895 100644 --- a/models/campaigns.js +++ b/models/campaigns.js @@ -599,7 +599,7 @@ async function updateMessageResponse(context, message, response, responseId) { }); } -async function getSubscribersQueryGeneratorTx(tx, campaignId, onlyUnsent, batchSize) { +async function getSubscribersQueryGeneratorTx(tx, campaignId, onlyUnsent) { /* This is supposed to produce queries like this: diff --git a/models/fields.js b/models/fields.js index 80a9e897..fcfa09a9 100644 --- a/models/fields.js +++ b/models/fields.js @@ -130,7 +130,7 @@ fieldTypes['checkbox-grouped'] = { cardinality: Cardinality.MULTIPLE, getHbsType: field => 'typeCheckboxGrouped', render: (field, value) => { - const subItems = value.map(col => field.groupedOptions[col].name); + const subItems = (value || []).map(col => field.groupedOptions[col].name); if (field.settings.groupTemplate) { return render(field.settings.groupTemplate, { @@ -149,7 +149,10 @@ fieldTypes['radio-grouped'] = { enumerated: false, cardinality: Cardinality.SINGLE, getHbsType: field => 'typeRadioGrouped', - render: (field, value) => field.groupedOptions[value].name + render: (field, value) => { + const fld = field.groupedOptions[value]; + return fld ? fld.name : ''; + } }; fieldTypes['dropdown-grouped'] = { @@ -159,7 +162,10 @@ fieldTypes['dropdown-grouped'] = { enumerated: false, cardinality: Cardinality.SINGLE, getHbsType: field => 'typeDropdownGrouped', - render: (field, value) => field.groupedOptions[value].name + render: (field, value) => { + const fld = field.groupedOptions[value]; + return fld ? fld.name : ''; + } }; fieldTypes['radio-enum'] = { @@ -173,7 +179,10 @@ fieldTypes['radio-enum'] = { enumerated: true, cardinality: Cardinality.SINGLE, getHbsType: field => 'typeRadioEnum', - render: (field, value) => field.groupedOptions[value].name + render: (field, value) => { + const fld = field.groupedOptions[value]; + return fld ? fld.name : ''; + } }; fieldTypes['dropdown-enum'] = { @@ -187,7 +196,10 @@ fieldTypes['dropdown-enum'] = { enumerated: true, cardinality: Cardinality.SINGLE, getHbsType: field => 'typeDropdownEnum', - render: (field, value) => field.groupedOptions[value].name + render: (field, value) => { + const fld = field.groupedOptions[value]; + return fld ? fld.name : ''; + } }; fieldTypes.option = { diff --git a/models/links.js b/models/links.js index cd9f84f2..d803b6f6 100644 --- a/models/links.js +++ b/models/links.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const knex = require('../lib/knex'); const dtHelpers = require('../lib/dt-helpers'); const shares = require('./shares'); diff --git a/models/shares.js b/models/shares.js index 01b5800f..21f08d78 100644 --- a/models/shares.js +++ b/models/shares.js @@ -6,7 +6,7 @@ const { enforce } = require('../lib/helpers'); const dtHelpers = require('../lib/dt-helpers'); const entitySettings = require('../lib/entity-settings'); const interoperableErrors = require('../shared/interoperable-errors'); -const log = require('npmlog'); +const log = require('../lib/log'); const {getGlobalNamespaceId} = require('../shared/namespaces'); const {getAdminId} = require('../shared/users'); diff --git a/routes/api.js b/routes/api.js index 992d2cff..468d5e4a 100644 --- a/routes/api.js +++ b/routes/api.js @@ -7,7 +7,7 @@ const fields = require('../models/fields'); const { SubscriptionStatus, SubscriptionSource } = require('../shared/lists'); const subscriptions = require('../models/subscriptions'); const confirmations = require('../models/confirmations'); -const log = require('npmlog'); +const log = require('../lib/log'); const router = require('../lib/router-async').create(); const mailHelpers = require('../lib/subscription-mail-helpers'); const interoperableErrors = require('../shared/interoperable-errors'); diff --git a/routes/links.js b/routes/links.js index 506ab882..d3f7654b 100644 --- a/routes/links.js +++ b/routes/links.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const config = require('config'); const router = require('../lib/router-async').create(); const links = require('../models/links'); diff --git a/routes/subscription.js b/routes/subscription.js index 12309ac5..fd39c08d 100644 --- a/routes/subscription.js +++ b/routes/subscription.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const config = require('config'); const router = require('../lib/router-async').create(); const confirmations = require('../models/confirmations'); diff --git a/routes/webhooks.js b/routes/webhooks.js index 9c3ee5bc..0bb17bf1 100644 --- a/routes/webhooks.js +++ b/routes/webhooks.js @@ -7,7 +7,7 @@ const sendConfigurations = require('../models/send-configurations'); const contextHelpers = require('../lib/context-helpers'); const {SubscriptionStatus} = require('../shared/lists'); const {MailerType} = require('../shared/send-configurations'); -const log = require('npmlog'); +const log = require('../lib/log'); const multer = require('multer'); const uploads = multer(); diff --git a/services/executor.js b/services/executor.js index c32b0da0..0c938310 100644 --- a/services/executor.js +++ b/services/executor.js @@ -7,7 +7,7 @@ const reportHelpers = require('../lib/report-helpers'); const fork = require('child_process').fork; const path = require('path'); -const log = require('npmlog'); +const log = require('../lib/log'); const fs = require('fs'); const privilegeHelpers = require('../lib/privilege-helpers'); diff --git a/services/feedcheck.js b/services/feedcheck.js index 792883bf..d0f1fcd5 100644 --- a/services/feedcheck.js +++ b/services/feedcheck.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const knex = require('../lib/knex'); const feedparser = require('feedparser-promised'); const { CampaignType, CampaignStatus, CampaignSource } = require('../shared/campaigns'); diff --git a/services/importer.js b/services/importer.js index 6a1088d5..dd8bfacf 100644 --- a/services/importer.js +++ b/services/importer.js @@ -2,7 +2,7 @@ const knex = require('../lib/knex'); const path = require('path'); -const log = require('npmlog'); +const log = require('../lib/log'); const fsExtra = require('fs-extra-promise'); const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports'); const imports = require('../models/imports'); diff --git a/services/postfix-bounce-server.js b/services/postfix-bounce-server.js index 07f916a4..b64ebebb 100644 --- a/services/postfix-bounce-server.js +++ b/services/postfix-bounce-server.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const config = require('config'); const net = require('net'); const campaigns = require('../models/campaigns'); diff --git a/services/sender-master.js b/services/sender-master.js index c17a959e..5204f72e 100644 --- a/services/sender-master.js +++ b/services/sender-master.js @@ -2,7 +2,7 @@ const config = require('config'); const fork = require('child_process').fork; -const log = require('npmlog'); +const log = require('../lib/log'); const path = require('path'); const knex = require('../lib/knex'); const {CampaignStatus, CampaignType} = require('../shared/campaigns'); @@ -24,10 +24,13 @@ const workerBatchSize = 100; const messageQueue = new Map(); // campaignId -> [{listId, email}] const messageQueueCont = new Map(); // campaignId -> next batch callback +const workAssignment = new Map(); // workerId -> { campaignId, subscribers: [{listId, email}] } + let workerSchedulerCont = null; function messagesProcessed(workerId) { + workAssignment.delete(workerId); idleWorkers.push(workerId); if (workerSchedulerCont) { @@ -70,6 +73,7 @@ async function scheduleWorkers() { if (queue.length > 0) { const subscribers = queue.splice(0, workerBatchSize); + workAssignment.set(workerId, {campaignId, subscribers}); if (queue.length === 0 && messageQueueCont.has(campaignId)) { const scheduleMessages = messageQueueCont.get(campaignId); @@ -113,11 +117,21 @@ async function processCampaign(campaignId) { let qryGen; await knex.transaction(async tx => { - qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true, retrieveBatchSize); + qryGen = await campaigns.getSubscribersQueryGeneratorTx(tx, campaignId, true); }); if (qryGen) { - const qry = qryGen(knex).select(['pending_subscriptions.email', 'campaign_lists.list']); + let subscribersInProcessing = [...msgQueue]; + for (const wa of workAssignment.values()) { + if (wa.campaignId === campaignId) { + subscribersInProcessing = subscribersInProcessing.concat(wa.subscribers); + } + } + + const qry = qryGen(knex) + .whereNotIn('pending_subscriptions.email', subscribersInProcessing.map(x => x.email)) + .select(['pending_subscriptions.email', 'campaign_lists.list']) + .limit(retrieveBatchSize); const subs = await qry; if (subs.length === 0) { @@ -261,7 +275,7 @@ async function init() { }); process.send({ - type: 'sender-started' + type: 'master-sender-started' }); periodicCampaignsCheck(); diff --git a/services/sender-worker.js b/services/sender-worker.js index 60ad050c..96636a6b 100644 --- a/services/sender-worker.js +++ b/services/sender-worker.js @@ -1,7 +1,7 @@ 'use strict'; const config = require('config'); -const log = require('npmlog'); +const log = require('../lib/log'); const mailers = require('../lib/mailers'); const CampaignSender = require('../lib/campaign-sender'); @@ -25,6 +25,7 @@ async function processMessages(campaignId, subscribers) { log.verbose('Senders', 'Message sent and status updated for %s:%s', subData.listId, subData.email); } catch (err) { log.error('Senders', `Sending message to ${subData.listId}:${subData.email} failed with error: ${err.message}`) + log.verbose(err); } } diff --git a/services/test-server.js b/services/test-server.js index 4efed4b2..af5de82e 100644 --- a/services/test-server.js +++ b/services/test-server.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const config = require('config'); const crypto = require('crypto'); const humanize = require('humanize'); diff --git a/services/triggers.js b/services/triggers.js index e53ac2eb..6d27f87b 100644 --- a/services/triggers.js +++ b/services/triggers.js @@ -1,6 +1,6 @@ 'use strict'; -const log = require('npmlog'); +const log = require('../lib/log'); const knex = require('../lib/knex'); const triggers = require('../models/triggers'); const campaigns = require('../models/campaigns'); diff --git a/services/tzupdate.js b/services/tzupdate.js index 855a7616..e9269bba 100644 --- a/services/tzupdate.js +++ b/services/tzupdate.js @@ -10,7 +10,7 @@ const moment = require('moment-timezone'); const knex = require('../lib/knex'); -const log = require('npmlog'); +const log = require('../lib/log'); let lastCheck = false; const timezone_timeout = 60 * 60 * 1000; diff --git a/services/verp-server.js b/services/verp-server.js index 0b399b30..1955dfb3 100644 --- a/services/verp-server.js +++ b/services/verp-server.js @@ -1,7 +1,7 @@ 'use strict'; const { nodeifyFunction, nodeifyPromise } = require('../lib/nodeify'); -const log = require('npmlog'); +const log = require('../lib/log'); const config = require('config'); const {MailerError} = require('../lib/mailers'); const campaigns = require('../models/campaigns'); diff --git a/workers/reports/report-processor.js b/workers/reports/report-processor.js index 579f643f..b489aa1f 100644 --- a/workers/reports/report-processor.js +++ b/workers/reports/report-processor.js @@ -10,7 +10,7 @@ const handlebarsHelpers = require('../../lib/handlebars-helpers'); const _ = require('../../lib/translate')._; const hbs = require('hbs'); const vm = require('vm'); -const log = require('npmlog'); +const log = require('../../lib/log'); const fs = require('fs'); const knex = require('../../lib/knex'); const contextHelpers = require('../../lib/context-helpers');