2018-12-21 19:09:18 +01:00

396 lines
14 KiB

'use strict';
const knex = require('../lib/knex');
const hasher = require('node-object-hash')();
const { enforce, filterObject } = require('../lib/helpers');
const dtHelpers = require('../lib/dt-helpers');
const interoperableErrors = require('../../shared/interoperable-errors');
const fields = require('./fields');
const namespaceHelpers = require('../lib/namespace-helpers');
const shares = require('./shares');
const reportHelpers = require('../lib/report-helpers');
const fs = require('fs-extra-promise');
const contextHelpers = require('../lib/context-helpers');
const {LinkId} = require('./links');
const subscriptions = require('./subscriptions');
const {Readable} = require('stream');
const ReportState = require('../../shared/reports').ReportState;
const allowedKeys = new Set(['name', 'description', 'report_template', 'params', 'namespace']);
function hash(entity) {
return hasher.hash(filterObject(entity, allowedKeys));
async function getByIdWithTemplate(context, id, withPermissions = true) {
return await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'report', id, 'view');
const entity = await tx('reports')
.where('', id)
.innerJoin('report_templates', 'reports.report_template', '')
.select(['', '', 'reports.description', 'reports.report_template', 'reports.params', 'reports.state', 'reports.namespace', 'report_templates.user_fields', 'report_templates.mime_type', 'report_templates.hbs', 'report_templates.js'])
entity.user_fields = JSON.parse(entity.user_fields);
entity.params = JSON.parse(entity.params);
if (withPermissions) {
entity.permissions = await shares.getPermissionsTx(tx, context, 'report', id);
return entity;
async function listDTAjax(context, params) {
return await dtHelpers.ajaxListWithPermissions(
{ entityTypeId: 'report', requiredOperations: ['view'] },
{ entityTypeId: 'reportTemplate', requiredOperations: ['view'] }
builder => builder.from('reports')
.innerJoin('report_templates', 'reports.report_template', '')
.innerJoin('namespaces', '', 'reports.namespace'),
'', '', '', 'reports.description',
'reports.last_run', '', 'reports.state', 'report_templates.mime_type'
async function create(context, entity) {
let id;
await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'namespace', entity.namespace, 'createReport');
await shares.enforceEntityPermissionTx(tx, context, 'reportTemplate', entity.report_template, 'execute');
await namespaceHelpers.validateEntity(tx, entity);
entity.params = JSON.stringify(entity.params);
const ids = await tx('reports').insert(filterObject(entity, allowedKeys));
id = ids[0];
await shares.rebuildPermissionsTx(tx, { entityTypeId: 'report', entityId: id });
const reportProcessor = require('../lib/report-processor');
await reportProcessor.start(id);
return id;
async function updateWithConsistencyCheck(context, entity) {
await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'report',, 'edit');
await shares.enforceEntityPermissionTx(tx, context, 'reportTemplate', entity.report_template, 'execute');
const existing = await tx('reports').where('id',;
if (!existing) {
throw new interoperableErrors.NotFoundError();
existing.params = JSON.parse(existing.params);
const existingHash = hash(existing);
if (existingHash !== entity.originalHash) {
throw new interoperableErrors.ChangedError();
await namespaceHelpers.validateEntity(tx, entity);
await namespaceHelpers.validateMove(context, entity, existing, 'report', 'createReport', 'delete');
entity.params = JSON.stringify(entity.params);
const filteredUpdates = filterObject(entity, allowedKeys);
filteredUpdates.state = ReportState.SCHEDULED;
await tx('reports').where('id',;
await shares.rebuildPermissionsTx(tx, { entityTypeId: 'report', entityId: });
// This require is here to avoid cyclic dependency
const reportProcessor = require('../lib/report-processor');
await reportProcessor.start(;
async function removeTx(tx, context, id) {
await shares.enforceEntityPermissionTx(tx, context, 'report', id, 'delete');
const report = await tx('reports').where('id', id).first();
await fs.removeAsync(reportHelpers.getReportContentFile(report));
await fs.removeAsync(reportHelpers.getReportOutputFile(report));
await tx('reports').where('id', id).del();
async function remove(context, id) {
await knex.transaction(async tx => {
await removeTx(tx, context, id);
async function updateFields(id, fields) {
return await knex('reports').where('id', id).update(fields);
async function listByState(state, limit) {
return await knex('reports').where('state', state).limit(limit);
async function bulkChangeState(oldState, newState) {
return await knex('reports').where('state', oldState).update('state', newState);
async function _getCampaignStatistics(campaign, select, unionQryFn, listQryFn, asStream) {
const subsQrys = [];
const commonFieldsMapping = {};
let firstIteration = true;
for (const cpgList of campaign.lists) {
const cpgListId = cpgList.list;
const subsTable = subscriptions.getSubscriptionTableName(cpgListId);
const flds = await fields.list(contextHelpers.getAdminContext(), cpgListId);
const assignedFlds = new Set();
for (const fld of flds) {
/* Dropdown and checkbox groups have field.column == null
For the time being, we don't group options and we don't expand enums. We just provide it as it is in the DB. */
if (fld.column) {
const fldKey = 'field:' + fld.key.toLowerCase();
if (firstIteration || commonFieldsMapping[fldKey]) {
commonFieldsMapping[fldKey] = 'subscriptions.' + fld.column;
for (const fldKey in commonFieldsMapping) {
if (!assignedFlds.has(fldKey)) {
delete commonFieldsMapping[fldKey];
firstIteration = false;
for (const cpgList of campaign.lists) {
const cpgListId = cpgList.list;
const subsTable = subscriptions.getSubscriptionTableName(cpgListId);
const campaignFieldsMapping = {
'list:id': {raw: knex.raw('?', [cpgListId])},
'tracker:count': {raw: 'COALESCE(`campaign_links`.`count`, 0)'},
'tracker:country': '',
'tracker:deviceType': 'campaign_links.device_type',
'tracker:status': 'campaign_messages.status',
'subscription:status': 'subscriptions.status',
'subscription:id': '',
'subscription:cid': 'subscriptions.cid',
'subscription:email': ''
const fieldsMapping = {
const getSelField = item => {
const itemMapping = fieldsMapping[item];
if (typeof itemMapping === 'string') {
return fieldsMapping[item] + ' AS ' + item;
} else if (itemMapping.raw) {
return knex.raw(fieldsMapping[item].raw + ' AS `' + item + '`');
let selFields = [];
for (let idx = 0; idx < select.length; idx++) {
const item = select[idx];
if (item in fieldsMapping) {
} else if (item === '*') {
selFields = selFields.concat(Object.keys(fieldsMapping).map(entry => getSelField(entry)));
} else {
let query = knex(`subscription__${cpgListId} AS subscriptions`)
.leftJoin('campaign_messages', {
'campaign_messages.subscription': '',
'campaign_messages.list': knex.raw('?', [cpgListId])
.leftJoin('campaign_links', {
'campaign_links.subscription': '',
'campaign_links.list': knex.raw('?', [cpgListId])
if (listQryFn) {
query = listQryFn(query);
if (subsQrys.length > 0) {
let subsSql, subsBindings;
const applyUnionQryFn = (subsSql, subsBindings) => {
if (unionQryFn) {
return unionQryFn(
knex.from(function() {
return knex.raw('(' + subsSql + ')', subsBindings);
} else {
return knex.raw(subsSql, subsBindings);
if (subsQrys.length === 1) {
subsSql = subsQrys[0].sql;
subsBindings = subsQrys[0].bindings;
if (asStream) {
return await applyUnionQryFn(subsSql, subsBindings).stream();
} else {
return await applyUnionQryFn(subsSql, subsBindings);
} else {
subsSql = => '(' + qry.sql + ')').join(' UNION ALL ');
subsBindings = Array.prototype.concat( => qry.bindings));
if (asStream) {
return applyUnionQryFn(subsSql, subsBindings).stream();
} else {
const res = await applyUnionQryFn(subsSql, subsBindings);
if (res[0] && Array.isArray(res[0])) {
return res[0]; // UNION ALL generates an array with result and schema
} else {
return res;
} else {
if (asStream) {
const result = new Readable({
objectMode: true,
return result;
} else {
return [];
async function _getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn, asStream) {
if (!listQryFn) {
listQryFn = qry => qry;
return await _getCampaignStatistics(
qry => listQryFn(
qry.where(function() {
this.whereNull('').orWhere('', LinkId.OPEN)
async function _getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn) {
if (!listQryFn) {
listQryFn = qry => qry;
return await _getCampaignStatistics(
qry => listQryFn(
qry.where(function() {
this.whereNull('').orWhere('', LinkId.GENERAL_CLICK)
async function _getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn) {
if (!listQryFn) {
listQryFn = qry => qry;
return await _getCampaignStatistics(
qry => listQryFn(
qry.where(function() {
this.whereNull('').orWhere('', '>', LinkId.GENERAL_CLICK)
async function getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn) {
return await _getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn, false);
async function getCampaignOpenStatisticsStream(campaign, select, unionQryFn, listQryFn) {
return await _getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn, true);
async function getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn) {
return await _getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn, false);
async function getCampaignClickStatisticsStream(campaign, select, unionQryFn, listQryFn) {
return await _getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn, true);
async function getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn) {
return await _getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn, false);
async function getCampaignLinkClickStatisticsStream(campaign, select, unionQryFn, listQryFn) {
return await _getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn, true);
module.exports.ReportState = ReportState;
module.exports.hash = hash;
module.exports.getByIdWithTemplate = getByIdWithTemplate;
module.exports.listDTAjax = listDTAjax;
module.exports.create = create;
module.exports.updateWithConsistencyCheck = updateWithConsistencyCheck;
module.exports.remove = remove;
module.exports.updateFields = updateFields;
module.exports.listByState = listByState;
module.exports.bulkChangeState = bulkChangeState;
module.exports.getCampaignOpenStatistics = getCampaignOpenStatistics;
module.exports.getCampaignClickStatistics = getCampaignClickStatistics;
module.exports.getCampaignLinkClickStatistics = getCampaignLinkClickStatistics;
module.exports.getCampaignOpenStatisticsStream = getCampaignOpenStatisticsStream;
module.exports.getCampaignClickStatisticsStream = getCampaignClickStatisticsStream;
module.exports.getCampaignLinkClickStatisticsStream = getCampaignLinkClickStatisticsStream;