'use strict'; const knex = require('../lib/knex'); const hasher = require('node-object-hash')(); const { enforce, filterObject } = require('../lib/helpers'); const dtHelpers = require('../lib/dt-helpers'); const interoperableErrors = require('../../shared/interoperable-errors'); const fields = require('./fields'); const namespaceHelpers = require('../lib/namespace-helpers'); const shares = require('./shares'); const reportHelpers = require('../lib/report-helpers'); const fs = require('fs-extra-promise'); const contextHelpers = require('../lib/context-helpers'); const {LinkId} = require('./links'); const subscriptions = require('./subscriptions'); const {Readable} = require('stream'); const ReportState = require('../../shared/reports').ReportState; const allowedKeys = new Set(['name', 'description', 'report_template', 'params', 'namespace']); function hash(entity) { return hasher.hash(filterObject(entity, allowedKeys)); } async function getByIdWithTemplate(context, id, withPermissions = true) { return await knex.transaction(async tx => { await shares.enforceEntityPermissionTx(tx, context, 'report', id, 'view'); const entity = await tx('reports') .where('reports.id', id) .innerJoin('report_templates', 'reports.report_template', 'report_templates.id') .select(['reports.id', 'reports.name', 'reports.description', 'reports.report_template', 'reports.params', 'reports.state', 'reports.namespace', 'report_templates.user_fields', 'report_templates.mime_type', 'report_templates.hbs', 'report_templates.js']) .first(); entity.user_fields = JSON.parse(entity.user_fields); entity.params = JSON.parse(entity.params); if (withPermissions) { entity.permissions = await shares.getPermissionsTx(tx, context, 'report', id); } return entity; }); } async function listDTAjax(context, params) { return await dtHelpers.ajaxListWithPermissions( context, [ { entityTypeId: 'report', requiredOperations: ['view'] }, { entityTypeId: 'reportTemplate', requiredOperations: ['view'] } ], params, builder => builder.from('reports') .innerJoin('report_templates', 'reports.report_template', 'report_templates.id') .innerJoin('namespaces', 'namespaces.id', 'reports.namespace'), [ 'reports.id', 'reports.name', 'report_templates.name', 'reports.description', 'reports.last_run', 'namespaces.name', 'reports.state', 'report_templates.mime_type' ] ); } async function create(context, entity) { let id; await knex.transaction(async tx => { await shares.enforceEntityPermissionTx(tx, context, 'namespace', entity.namespace, 'createReport'); await shares.enforceEntityPermissionTx(tx, context, 'reportTemplate', entity.report_template, 'execute'); await namespaceHelpers.validateEntity(tx, entity); entity.params = JSON.stringify(entity.params); const ids = await tx('reports').insert(filterObject(entity, allowedKeys)); id = ids[0]; await shares.rebuildPermissionsTx(tx, { entityTypeId: 'report', entityId: id }); }); const reportProcessor = require('../lib/report-processor'); await reportProcessor.start(id); return id; } async function updateWithConsistencyCheck(context, entity) { await knex.transaction(async tx => { await shares.enforceEntityPermissionTx(tx, context, 'report', entity.id, 'edit'); await shares.enforceEntityPermissionTx(tx, context, 'reportTemplate', entity.report_template, 'execute'); const existing = await tx('reports').where('id', entity.id).first(); if (!existing) { throw new interoperableErrors.NotFoundError(); } existing.params = JSON.parse(existing.params); const existingHash = hash(existing); if (existingHash !== entity.originalHash) { throw new interoperableErrors.ChangedError(); } await namespaceHelpers.validateEntity(tx, entity); await namespaceHelpers.validateMove(context, entity, existing, 'report', 'createReport', 'delete'); entity.params = JSON.stringify(entity.params); const filteredUpdates = filterObject(entity, allowedKeys); filteredUpdates.state = ReportState.SCHEDULED; await tx('reports').where('id', entity.id).update(filteredUpdates); await shares.rebuildPermissionsTx(tx, { entityTypeId: 'report', entityId: entity.id }); }); // This require is here to avoid cyclic dependency const reportProcessor = require('../lib/report-processor'); await reportProcessor.start(entity.id); } async function removeTx(tx, context, id) { await shares.enforceEntityPermissionTx(tx, context, 'report', id, 'delete'); const report = await tx('reports').where('id', id).first(); await fs.removeAsync(reportHelpers.getReportContentFile(report)); await fs.removeAsync(reportHelpers.getReportOutputFile(report)); await tx('reports').where('id', id).del(); } async function remove(context, id) { await knex.transaction(async tx => { await removeTx(tx, context, id); }); } async function updateFields(id, fields) { return await knex('reports').where('id', id).update(fields); } async function listByState(state, limit) { return await knex('reports').where('state', state).limit(limit); } async function bulkChangeState(oldState, newState) { return await knex('reports').where('state', oldState).update('state', newState); } async function _getCampaignStatistics(campaign, select, unionQryFn, listQryFn, asStream) { const subsQrys = []; const commonFieldsMapping = {}; let firstIteration = true; for (const cpgList of campaign.lists) { const cpgListId = cpgList.list; const subsTable = subscriptions.getSubscriptionTableName(cpgListId); const flds = await fields.list(contextHelpers.getAdminContext(), cpgListId); const assignedFlds = new Set(); for (const fld of flds) { /* Dropdown and checkbox groups have field.column == null For the time being, we don't group options and we don't expand enums. We just provide it as it is in the DB. */ if (fld.column) { const fldKey = 'field:' + fld.key.toLowerCase(); if (firstIteration || commonFieldsMapping[fldKey]) { commonFieldsMapping[fldKey] = 'subscriptions.' + fld.column; assignedFlds.add(fldKey); } } } for (const fldKey in commonFieldsMapping) { if (!assignedFlds.has(fldKey)) { delete commonFieldsMapping[fldKey]; } } firstIteration = false; } for (const cpgList of campaign.lists) { const cpgListId = cpgList.list; const subsTable = subscriptions.getSubscriptionTableName(cpgListId); const campaignFieldsMapping = { 'list:id': {raw: knex.raw('?', [cpgListId])}, 'tracker:count': {raw: 'COALESCE(`campaign_links`.`count`, 0)'}, 'tracker:country': 'campaign_links.country', 'tracker:deviceType': 'campaign_links.device_type', 'tracker:status': 'campaign_messages.status', 'subscription:status': 'subscriptions.status', 'subscription:id': 'subscriptions.id', 'subscription:cid': 'subscriptions.cid', 'subscription:email': 'subscriptions.email' }; const fieldsMapping = { ...commonFieldsMapping, ...campaignFieldsMapping }; const getSelField = item => { const itemMapping = fieldsMapping[item]; if (typeof itemMapping === 'string') { return fieldsMapping[item] + ' AS ' + item; } else if (itemMapping.raw) { return knex.raw(fieldsMapping[item].raw + ' AS `' + item + '`'); } }; let selFields = []; for (let idx = 0; idx < select.length; idx++) { const item = select[idx]; if (item in fieldsMapping) { selFields.push(getSelField(item)); } else if (item === '*') { selFields = selFields.concat(Object.keys(fieldsMapping).map(entry => getSelField(entry))); } else { selFields.push(item); } } let query = knex(`subscription__${cpgListId} AS subscriptions`) .leftJoin('campaign_messages', { 'campaign_messages.subscription': 'subscriptions.id', 'campaign_messages.list': knex.raw('?', [cpgListId]) }) .leftJoin('campaign_links', { 'campaign_links.subscription': 'subscriptions.id', 'campaign_links.list': knex.raw('?', [cpgListId]) }) .select(selFields); if (listQryFn) { query = listQryFn(query); } subsQrys.push(query.toSQL().toNative()); } if (subsQrys.length > 0) { let subsSql, subsBindings; const applyUnionQryFn = (subsSql, subsBindings) => { if (unionQryFn) { return unionQryFn( knex.from(function() { return knex.raw('(' + subsSql + ')', subsBindings); }) ); } else { return knex.raw(subsSql, subsBindings); } }; if (subsQrys.length === 1) { subsSql = subsQrys[0].sql; subsBindings = subsQrys[0].bindings; } else { subsSql = subsQrys.map(qry => '(' + qry.sql + ')').join(' UNION ALL '); subsBindings = Array.prototype.concat(...subsQrys.map(qry => qry.bindings)); } if (asStream) { return applyUnionQryFn(subsSql, subsBindings).stream(); } else { const res = await applyUnionQryFn(subsSql, subsBindings); if (res[0] && Array.isArray(res[0])) { return res[0]; // UNION ALL generates an array with result and schema } else { return res; } } } else { if (asStream) { const result = new Readable({ objectMode: true, }); result.push(null); return result; } else { return []; } } } async function _getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn, asStream) { if (!listQryFn) { listQryFn = qry => qry; } return await _getCampaignStatistics( campaign, select, unionQryFn, qry => listQryFn( qry.where(function() { this.whereNull('campaign_links.link').orWhere('campaign_links.link', LinkId.OPEN) }) ), asStream ); } async function _getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn) { if (!listQryFn) { listQryFn = qry => qry; } return await _getCampaignStatistics( campaign, select, unionQryFn, qry => listQryFn( qry.where(function() { this.whereNull('campaign_links.link').orWhere('campaign_links.link', LinkId.GENERAL_CLICK) }) ), asStream ); } async function _getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn) { if (!listQryFn) { listQryFn = qry => qry; } return await _getCampaignStatistics( campaign, select, unionQryFn, qry => listQryFn( qry.where(function() { this.whereNull('campaign_links.link').orWhere('campaign_links.link', '>', LinkId.GENERAL_CLICK) }) ), asStream ); } async function getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn) { return await _getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn, false); } async function getCampaignOpenStatisticsStream(campaign, select, unionQryFn, listQryFn) { return await _getCampaignOpenStatistics(campaign, select, unionQryFn, listQryFn, true); } async function getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn) { return await _getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn, false); } async function getCampaignClickStatisticsStream(campaign, select, unionQryFn, listQryFn) { return await _getCampaignClickStatistics(campaign, select, unionQryFn, listQryFn, true); } async function getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn) { return await _getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn, false); } async function getCampaignLinkClickStatisticsStream(campaign, select, unionQryFn, listQryFn) { return await _getCampaignLinkClickStatistics(campaign, select, unionQryFn, listQryFn, true); } module.exports.ReportState = ReportState; module.exports.hash = hash; module.exports.getByIdWithTemplate = getByIdWithTemplate; module.exports.listDTAjax = listDTAjax; module.exports.create = create; module.exports.updateWithConsistencyCheck = updateWithConsistencyCheck; module.exports.remove = remove; module.exports.updateFields = updateFields; module.exports.listByState = listByState; module.exports.bulkChangeState = bulkChangeState; module.exports.getCampaignOpenStatistics = getCampaignOpenStatistics; module.exports.getCampaignClickStatistics = getCampaignClickStatistics; module.exports.getCampaignLinkClickStatistics = getCampaignLinkClickStatistics; module.exports.getCampaignOpenStatisticsStream = getCampaignOpenStatisticsStream; module.exports.getCampaignClickStatisticsStream = getCampaignClickStatisticsStream; module.exports.getCampaignLinkClickStatisticsStream = getCampaignLinkClickStatisticsStream;