Triggers ported. Not tested yet.

This commit is contained in:
Tomas Bures 2018-09-22 15:59:05 +02:00
parent 907d548e02
commit 92d28daa9e
7 changed files with 206 additions and 31 deletions

View file

@ -4,12 +4,12 @@ const config = require('config');
const log = require('npmlog');
const appBuilder = require('./app-builder');
const http = require('http');
//const triggers = require('./services/triggers');
const triggers = require('./services/triggers');
const importer = require('./lib/importer');
const feedcheck = require('./lib/feedcheck');
const verpServer = require('./services/verp-server');
const testServer = require('./services/test-server');
//const postfixBounceServer = require('./services/postfix-bounce-server');
const postfixBounceServer = require('./services/postfix-bounce-server');
const tzupdate = require('./services/tzupdate');
const dbcheck = require('./lib/dbcheck');
const senders = require('./lib/senders');
@ -95,14 +95,14 @@ dbcheck(err => { // Check if database needs upgrading before starting the server
importer.spawn(() => {
feedcheck.spawn(() => {
senders.spawn(() => {
//triggers(() => {
triggers.start();
postfixBounceServer(async () => {
(async () => {
await reportProcessor.init();
log.info('Service', 'All services started');
})();
});
//});
});
});
});

View file

@ -624,7 +624,7 @@ async function getSubscribersQueryGeneratorTx(tx, campaignId, onlyUnsent, batchS
.leftJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', cpgList.campaign)
.where('campaign_messages.campaign', campaignId)
.where('campaign_messages.list', cpgList.list)
.as('related_campaign_messages');
},

View file

@ -168,8 +168,8 @@ async function updateLinks(campaign, list, subscription, message) {
}
}
module.exports.LinkId = LinkId;
module.exports.resolve = resolve;
module.exports.countLink = countLink;
module.exports.addOrGet = addOrGet;
module.exports.updateLinks = updateLinks;
module.exports.updateLinks = updateLinks;

View file

@ -81,13 +81,6 @@ async function create(context, campaignId, entity) {
const ids = await tx('triggers').insert(filteredEntity);
const id = ids[0];
await knex.schema.raw('CREATE TABLE `trigger__' + id + '` (\n' +
' `list` int(11) unsigned NOT NULL,\n' +
' `subscription` int(11) unsigned NOT NULL,\n' +
' `created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,\n' +
' PRIMARY KEY (`list`,`subscription`)\n' +
') ENGINE=InnoDB DEFAULT CHARSET=utf8;\n');
return id;
});
}
@ -116,9 +109,13 @@ async function updateWithConsistencyCheck(context, campaignId, entity) {
async function removeTx(tx, context, campaignId, id) {
await shares.enforceEntityPermissionTx(tx, context, 'campaign', campaignId, 'manageTriggers');
await tx('triggers').where({campaign: campaignId, id}).del();
const existing = await tx('triggers').where({campaign: campaignId, id}).first();
if (!existing) {
throw new interoperableErrors.NotFoundError();
}
await knex.schema.dropTableIfExists('trigger__' + id);
await tx('trigger_messages').where({trigger: id}).del();
await tx('triggers').where('id', id).del();
}
async function remove(context, campaignId, id) {

View file

@ -25,21 +25,6 @@ let libmime = require('libmime');
const workerId = Number.parseInt(process.argv[2]);
let running = false;
/*
const knex = require('../lib/knex');
const path = require('path');
const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');
const fields = require('../models/fields');
const { Writable } = require('stream');
const { cleanupFromPost, enforce } = require('../lib/helpers');
const tools = require('../lib/tools');
const shares = require('../models/shares');
const _ = require('../lib/translate')._;
*/
class CampaignSender {
constructor(campaignId) {
this.campaignId = campaignId;

180
services/triggers.js Normal file
View file

@ -0,0 +1,180 @@
'use strict';
const log = require('npmlog');
const knex = require('../lib/knex');
const triggers = require('../models/triggers');
const campaigns = require('../models/campaigns');
const subscriptions = require('../models/subscriptions');
const segments = require('../models/segments');
const { Entity, Event } = require('../shared/triggers');
const { SubscriptionStatus } = require('../shared/lists');
const links = require('../models/links');
const contextHelpers = require('../lib/context-helpers');
const triggerCheckPeriod = 15 * 1000;
const triggerFirePeriod = 60 * 1000;
async function start() {
while (true) {
const fired = await knex.transaction(async tx => {
const currentTs = new Date();
const trigger = await tx('triggers').where('enabled', true).andWhere('last_check', '<', currentTs - triggerFirePeriod).orderBy('last_check', 'asc').first();
if (!trigger) {
return false;
}
const campaign = campaigns.getByIdTx(tx, contextHelpers.getAdminContext(), trigger.campaign, false);
for (const cpgList of campaign.lists) {
const addSegmentQuery = cpgList.segment ? await segments.getQueryGeneratorTx(tx, cpgList.list, cpgList.segment) : () => {
};
const subsTable = subscriptions.getSubscriptionTableName(cpgList.list);
let sqlQry = knex.from(subsTable)
.leftJoin(
function () {
return this.from('trigger_messages')
.where('trigger_messages.campaign', campaign.id)
.where('trigger_messages.list', cpgList.list)
.as('related_trigger_messages');
},
'related_trigger_messages.subscription', subsTable + '.id'
)
.where(function () {
addSegmentQuery(this);
})
.whereNotNull('related_trigger_messages.id') // This means only those where the trigger has not fired yet somewhen in the past
.select(subsTable + '.id');
let column;
if (trigger.entity === Entity.SUBSCRIPTION) {
column = subsTable + '.' + trigger.event;
} else if (trigger.entity === Entity.CAMPAIGN) {
if (trigger.event === Event[Entity.CAMPAIGN].DELIVERED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', campaign.id)
.where('campaign_messages.list', cpgList.list)
.as('campaign_messages');
}, 'campaign_messages.subscription', subsTable + '.id');
column = 'campaign_messages.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].OPENED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_links')
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.OPEN)
.as('campaign_links');
}, 'campaign_links', 'campaign_links.subscription', subsTable + '.id');
column = 'campaign_links.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].CLICKED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_links')
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.GENERAL_CLICK)
.as('campaign_links');
}, 'campaign_links', 'campaign_links.subscription', subsTable + '.id');
column = 'campaign_links.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].NOT_OPENED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', campaign.id)
.where('campaign_messages.list', cpgList.list)
.as('campaign_messages');
}, 'campaign_messages.subscription', subsTable + '.id')
.whereNotExists(function () {
return this
.select('*')
.from('campaign_links')
.whereRaw(`campaign_links.subscription = ${subsTable}.id`)
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.OPEN);
});
column = 'campaign_messages.created';
} else if (trigger.event === Event[Entity.CAMPAIGN].NOT_CLICKED) {
sqlQry = sqlQry.innerJoin(
function () {
return this.from('campaign_messages')
.where('campaign_messages.campaign', campaign.id)
.where('campaign_messages.list', cpgList.list)
.as('campaign_messages');
}, 'campaign_messages.subscription', subsTable + '.id')
.whereNotExists(function () {
return this
.select('*')
.from('campaign_links')
.whereRaw(`campaign_links.subscription = ${subsTable}.id`)
.where('campaign_links.campaign', campaign.id)
.where('campaign_links.list', cpgList.list)
.where('campaign_links.link', links.LinkId.GENERAL_CLICK);
});
column = 'campaign_messages.created';
}
sqlQry = sqlQry.where(column, '<=', currentTs - trigger.seconds_after);
if (trigger.last_check !== null) {
sqlQry = sqlQry.where(column, '>', trigger.last_check);
}
}
const subscribers = await sqlQry;
for (const subscriber of subscribers) {
await tx('trigger_messages').insert({
campaign: campaign.id,
list: cpgList.list,
subscription: subscriber.id
});
await tx('queued').insert({
campaign: campaign.id,
list: cpgList.list,
subscription: subscriber.id,
trigger: trigger.id
});
await tx('triggers').increment('count').where('id', trigger.id);
log.verbose('Triggers', `Triggered ${trigger.name} (${trigger.id}) for subscriber ${subscriber.id}`);
}
}
await tx('triggers').update('last_check', currentTs).where('id', trigger.id);
return true;
});
if (!fired) {
const nextCycle = new Promise(resolve => {
setTimeout(resolve, triggerCheckPeriod);
});
await nextCycle;
}
}
}
module.exports.start = start;

View file

@ -1054,6 +1054,11 @@ async function migrateAttachments(knex) {
}
async function migrateTriggers(knex) {
await knex.schema.table('queued', table => {
table.renameColumn('subscriber', 'subscription');
table.renameColumn('source', 'trigger');
});
await knex.schema.table('triggers', table => {
table.renameColumn('rule', 'entity');
table.renameColumn('column', 'event');
@ -1086,6 +1091,14 @@ async function migrateTriggers(knex) {
table.dropColumn('segment');
});
await knex.schema.raw('CREATE TABLE `trigger_messages` (\n' +
' `trigger` int(10) unsigned NOT NULL,\n' +
' `list` int(11) unsigned NOT NULL,\n' +
' `subscription` int(11) unsigned NOT NULL,\n' +
' `created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,\n' +
' PRIMARY KEY (`trigger`, `list`,`subscription`)\n' +
') ENGINE=InnoDB DEFAULT CHARSET=utf8;\n');
await knex.schema.dropTableIfExists('trigger');
}