From cd798b5af73b1593b532e9fccdf26637705dc85c Mon Sep 17 00:00:00 2001 From: Tomas Bures Date: Mon, 6 Aug 2018 20:24:51 +0530 Subject: [PATCH] Preparation of merge with master --- client/src/campaigns/CUD.js | 4 + client/src/lists/imports/CUD.js | 25 +- client/src/lists/imports/helpers.js | 12 +- client/src/send-configurations/CUD.js | 2 + index.js | 11 +- lib/executor.js | 2 +- lib/importer.js | 54 +++ models/campaigns.js | 2 +- models/imports.js | 11 +- models/lists.js | 5 +- models/send-configurations.js | 2 +- models/subscriptions.js | 34 +- {lib => obsolete/lib}/senders.js | 0 {services => obsolete/services}/feedcheck.js | 0 obsolete/services/importer.js | 280 +++++++++++++ .../services}/postfix-bounce-server.js | 0 {services => obsolete/services}/sender.js | 0 {services => obsolete/services}/triggers.js | 0 .../services}/verp-server.js | 0 routes/api.js | 4 +- routes/rest/subscriptions.js | 5 +- routes/subscription.js | 4 +- services/importer.js | 369 ++++++------------ .../migrations/20170506102634_v1_to_v2.js | 44 ++- shared/imports.js | 13 +- shared/lists.js | 9 + 26 files changed, 607 insertions(+), 285 deletions(-) create mode 100644 lib/importer.js rename {lib => obsolete/lib}/senders.js (100%) rename {services => obsolete/services}/feedcheck.js (100%) create mode 100644 obsolete/services/importer.js rename {services => obsolete/services}/postfix-bounce-server.js (100%) rename {services => obsolete/services}/sender.js (100%) rename {services => obsolete/services}/triggers.js (100%) rename {services => obsolete/services}/verp-server.js (100%) diff --git a/client/src/campaigns/CUD.js b/client/src/campaigns/CUD.js index b4469854..ad0f81b4 100644 --- a/client/src/campaigns/CUD.js +++ b/client/src/campaigns/CUD.js @@ -188,6 +188,8 @@ export default class CUD extends Component { click_tracking_disabled: false, open_trackings_disabled: false, + unsubscribe_url: '', + source: CampaignSource.TEMPLATE, // This is for CampaignSource.TEMPLATE and CampaignSource.CUSTOM_FROM_TEMPLATE @@ -531,6 +533,8 @@ export default class CUD extends Component { {sendSettings} + +
diff --git a/client/src/lists/imports/CUD.js b/client/src/lists/imports/CUD.js index faba4c82..0f949d89 100644 --- a/client/src/lists/imports/CUD.js +++ b/client/src/lists/imports/CUD.js @@ -63,6 +63,7 @@ export default class CUD extends Component { if (data.type === ImportType.CSV_FILE) { data.csvFileName = data.settings.csv.originalname; + data.csvDelimiter = data.settings.csv.delimiter; } }); @@ -71,7 +72,8 @@ export default class CUD extends Component { name: '', description: '', type: ImportType.CSV_FILE, - csvFileName: '' + csvFileName: '', + csvDelimiter: ',' }); } } @@ -93,6 +95,10 @@ export default class CUD extends Component { if (!this.csvFile || this.csvFile.files.length === 0) { state.setIn(['csvFileName', 'error'], t('File must be selected')); } + + if (!state.getIn(['csvDelimiter', 'value']).trim()) { + state.setIn(['csvDelimiter', 'error'], t('CSV delimiter must not be empty')); + } } } @@ -119,7 +125,12 @@ export default class CUD extends Component { const formData = new FormData(); if (!isEdit && data.type === ImportType.CSV_FILE) { + data.settings.csv = {}; formData.append('csvFile', this.csvFile.files[0]); + data.settings.csv.delimiter = data.csvDelimiter.trim(); + + delete data.csvFile; + delete data.csvDelimiter; } formData.append('entity', JSON.stringify(data)); @@ -151,9 +162,17 @@ export default class CUD extends Component { let settings = null; if (type === ImportType.CSV_FILE) { if (isEdit) { - settings = {this.getFormValue('csvFileName')}; + settings = +
+ {this.getFormValue('csvFileName')} + {this.getFormValue('csvDelimiter')} +
; } else { - settings = this.csvFile = node} type="file" onChange={::this.onFileSelected}/>; + settings = +
+ this.csvFile = node} type="file" onChange={::this.onFileSelected}/> + +
; } } diff --git a/client/src/lists/imports/helpers.js b/client/src/lists/imports/helpers.js index 96dfe3a3..3aa88ef3 100644 --- a/client/src/lists/imports/helpers.js +++ b/client/src/lists/imports/helpers.js @@ -11,10 +11,14 @@ export function getImportTypes(t) { }; const importStatusLabels = { - [ImportStatus.NOT_READY]: t('Preparing'), - [ImportStatus.RUNNING]: t('Running'), - [ImportStatus.SCHEDULED]: t('Scheduled'), - [ImportStatus.FINISHED]: t('Finished') + [ImportStatus.PREP_SCHEDULED]: t('Created'), + [ImportStatus.PREP_RUNNING]: t('Preparing'), + [ImportStatus.PREP_FINISHED]: t('Ready'), + [ImportStatus.PREP_FAILED]: t('Preparation failed'), + [ImportStatus.RUN_SCHEDULED]: t('Scheduled'), + [ImportStatus.RUN_RUNNING]: t('Running'), + [ImportStatus.RUN_FINISHED]: t('Finished'), + [ImportStatus.RUN_FAILED]: t('Failed') }; return { diff --git a/client/src/send-configurations/CUD.js b/client/src/send-configurations/CUD.js index 1f9630b9..69579839 100644 --- a/client/src/send-configurations/CUD.js +++ b/client/src/send-configurations/CUD.js @@ -91,6 +91,7 @@ export default class CUD extends Component { subject_overridable: false, verpEnabled: false, verp_hostname: '', + x_mailer: '', mailer_type: MailerType.ZONE_MTA, ...this.mailerTypes[MailerType.ZONE_MTA].initData() }); @@ -200,6 +201,7 @@ export default class CUD extends Component { + {mailerForm} diff --git a/index.js b/index.js index 5e4a945d..044fda92 100644 --- a/index.js +++ b/index.js @@ -5,7 +5,7 @@ const log = require('npmlog'); const appBuilder = require('./app-builder'); const http = require('http'); //const triggers = require('./services/triggers'); -// const importer = require('./services/importer'); +const importer = require('./lib/importer'); // const verpServer = require('./services/verp-server'); const testServer = require('./services/test-server'); //const postfixBounceServer = require('./services/postfix-bounce-server'); @@ -89,20 +89,21 @@ dbcheck(err => { // Check if database needs upgrading before starting the server privilegeHelpers.dropRootPrivileges(); tzupdate.start(); - //importer(() => { + + importer.spawn(() => { //triggers(() => { //senders.spawn(() => { //feedcheck(() => { //postfixBounceServer(async () => { - (async () => { + (async () => { await reportProcessor.init(); log.info('Service', 'All services started'); - })(); + })(); //}); //}); //}); //}); - //}); + }); }); }); //}); diff --git a/lib/executor.js b/lib/executor.js index 1cf1d959..3fa1cdf3 100644 --- a/lib/executor.js +++ b/lib/executor.js @@ -27,7 +27,7 @@ function spawn(callback) { if (msg.type === 'process-started') { let requestCallback = requestCallbacks[msg.tid]; if (requestCallback && requestCallback.startedCallback) { - requestCallback.startedCallback(msg.tid); + requestCallback.startedCallback(msg.tid, ); } } else if (msg.type === 'process-failed') { diff --git a/lib/importer.js b/lib/importer.js new file mode 100644 index 00000000..266ec761 --- /dev/null +++ b/lib/importer.js @@ -0,0 +1,54 @@ +'use strict'; + +const knex = require('./knex'); +const fork = require('child_process').fork; +const log = require('npmlog'); +const path = require('path'); +const {ImportStatus} = require('../shared/imports'); + +let messageTid = 0; +let importerProcess; + +module.exports = { + spawn, + scheduleCheck +}; + +function spawn(callback) { + log.info('Importer', 'Spawning importer process.'); + + knex.transaction(async tx => { + await tx('imports').where('status', ImportStatus.PREP_RUNNING).update({status: ImportStatus.PREP_SCHEDULED}); + await tx('imports').where('status', ImportStatus.RUN_RUNNING).update({status: ImportStatus.RUN_SCHEDULED}); + + }).then(() => { + importerProcess = fork(path.join(__dirname, '..', 'services', 'importer.js'), [], { + cwd: path.join(__dirname, '..'), + env: {NODE_ENV: process.env.NODE_ENV} + }); + + importerProcess.on('message', msg => { + if (msg) { + if (msg.type === 'importer-started') { + log.info('Importer', 'Importer process started.'); + return callback(); + } + } + }); + + importerProcess.on('close', (code, signal) => { + log.info('Importer', 'Importer process exited with code %s signal %s.', code, signal); + }); + }); +} + +function scheduleCheck() { + importerProcess.send({ + type: 'scheduleCheck', + tid: messageTid + }); + + messageTid++; +} + + diff --git a/models/campaigns.js b/models/campaigns.js index bf44b5b8..4fe2f5da 100644 --- a/models/campaigns.js +++ b/models/campaigns.js @@ -16,7 +16,7 @@ const sendConfigurations = require('./send-configurations'); const triggers = require('./triggers'); const allowedKeysCommon = ['name', 'description', 'list', 'segment', 'namespace', - 'send_configuration', 'from_name_override', 'from_email_override', 'reply_to_override', 'subject_override', 'data', 'click_tracking_disabled', 'open_tracking_disabled']; + 'send_configuration', 'from_name_override', 'from_email_override', 'reply_to_override', 'subject_override', 'data', 'click_tracking_disabled', 'open_tracking_disabled', 'unsubscribe_url']; const allowedKeysCreate = new Set(['type', 'source', ...allowedKeysCommon]); const allowedKeysUpdate = new Set([...allowedKeysCommon]); diff --git a/models/imports.js b/models/imports.js index ad055feb..078ead37 100644 --- a/models/imports.js +++ b/models/imports.js @@ -49,6 +49,11 @@ async function _validateAndPreprocess(tx, listId, entity, isCreate) { enforce(entity.type >= ImportType.MIN && entity.type <= ImportType.MAX, 'Invalid import type'); entity.settings = entity.settings || {}; + + if (entity.type === ImportType.CSV_FILE) { + entity.settings.csv = entity.settings.csv || {}; + enforce(entity.settings.csv.delimiter.trim(), 'CSV delimiter must not be empty'); + } } async function create(context, listId, entity, files) { @@ -68,10 +73,11 @@ async function create(context, listId, entity, files) { entity.settings.csv = { originalname: csvFile.originalname, - filename: csvFile.filename + filename: csvFile.filename, + delimiter: entity.settings.csv.delimiter }; - entity.status = ImportStatus.NOT_READY; + entity.status = ImportStatus.PREP_SCHEDULED; } @@ -151,6 +157,7 @@ async function removeAllByListIdTx(tx, context, listId) { // This is to handle circular dependency with segments.js module.exports = { + filesDir, hash, getById, listDTAjax, diff --git a/models/lists.js b/models/lists.js index dc4cd265..9d136502 100644 --- a/models/lists.js +++ b/models/lists.js @@ -98,11 +98,12 @@ async function create(context, entity) { await knex.schema.raw('CREATE TABLE `subscription__' + id + '` (\n' + ' `id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n' + ' `cid` varchar(255) CHARACTER SET ascii NOT NULL,\n' + - ' `email` varchar(255) CHARACTER SET utf8 NOT NULL DEFAULT \'\',\n' + + ' `email` varchar(255) CHARACTER SET utf8 NOT NULL,\n' + + ' `hash_email` varchar(255) CHARACTER SET ascii NOT NULL,\n' + + ' `source_email` int(10) unsigned,\n' + // This references imports if the source is an import, 0 means some import in version 1, NULL if the source is via subscription or edit of the subscription ' `opt_in_ip` varchar(100) DEFAULT NULL,\n' + ' `opt_in_country` varchar(2) DEFAULT NULL,\n' + ' `tz` varchar(100) CHARACTER SET ascii DEFAULT NULL,\n' + - ' `imported` int(11) unsigned DEFAULT NULL,\n' + ' `status` tinyint(4) unsigned NOT NULL DEFAULT \'1\',\n' + ' `is_test` tinyint(4) unsigned NOT NULL DEFAULT \'0\',\n' + ' `status_change` timestamp NULL DEFAULT NULL,\n' + diff --git a/models/send-configurations.js b/models/send-configurations.js index 20b49c66..7c1d79c6 100644 --- a/models/send-configurations.js +++ b/models/send-configurations.js @@ -10,7 +10,7 @@ const namespaceHelpers = require('../lib/namespace-helpers'); const {MailerType, getSystemSendConfigurationId} = require('../shared/send-configurations'); const contextHelpers = require('../lib/context-helpers'); -const allowedKeys = new Set(['name', 'description', 'from_email', 'from_email_overridable', 'from_name', 'from_name_overridable', 'reply_to', 'reply_to_overridable', 'subject', 'subject_overridable', 'verp_hostname', 'mailer_type', 'mailer_settings', 'namespace']); +const allowedKeys = new Set(['name', 'description', 'from_email', 'from_email_overridable', 'from_name', 'from_name_overridable', 'reply_to', 'reply_to_overridable', 'subject', 'subject_overridable', 'x_mailer', 'verp_hostname', 'mailer_type', 'mailer_settings', 'namespace']); const allowedMailerTypes = new Set(Object.values(MailerType)); diff --git a/models/subscriptions.js b/models/subscriptions.js index d1d10f9c..97affdb1 100644 --- a/models/subscriptions.js +++ b/models/subscriptions.js @@ -425,6 +425,32 @@ async function _validateAndPreprocess(tx, listId, groupedFieldsMap, entity, meta } } +function updateSourcesAndHash(subscription, source, groupedFieldsMap) { + if ('email' in subscription) { + subscription.hash_email = crypto.createHash('sha512').update(subscription.email).digest("base64"); + subscription.source_email = source; + } + + for (const fldKey in groupedFieldsMap) { + const fld = groupedFieldsMap[fldKey]; + + const fieldType = fields.getFieldType(fld.type); + if (fieldType.grouped) { + for (const optionKey in fld.groupedOptions) { + const option = fld.groupedOptions[optionKey]; + + if (option.column in subscription) { + subscription['source_' + option.column] = source; + } + } + } else { + if (fldKey in subscription) { + subscription['source_' + fldKey] = source; + } + } + } +} + async function _update(tx, listId, existing, filteredEntity) { if ('status' in filteredEntity) { if (existing.status !== filteredEntity.status) { @@ -464,7 +490,7 @@ async function _create(tx, listId, filteredEntity) { If it is unsubscribed and meta.updateOfUnsubscribedAllowed, the existing subscription is changed based on the provided data. If meta.updateAllowed is true, it updates even an active subscription. */ -async function create(context, listId, entity, meta /* meta is provided when called from /confirm/subscribe/:cid */) { +async function create(context, listId, entity, source, meta /* meta is provided when called from /confirm/subscribe/:cid */) { return await knex.transaction(async tx => { await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'manageSubscriptions'); @@ -478,6 +504,8 @@ async function create(context, listId, entity, meta /* meta is provided when cal ungroupSubscription(groupedFieldsMap, filteredEntity); + updateSourcesAndHash(filteredEntity, source, groupedFieldsMap); + filteredEntity.opt_in_ip = meta && meta.ip; filteredEntity.opt_in_country = meta && meta.country; filteredEntity.imported = meta && !!meta.imported; @@ -498,7 +526,7 @@ async function create(context, listId, entity, meta /* meta is provided when cal }); } -async function updateWithConsistencyCheck(context, listId, entity) { +async function updateWithConsistencyCheck(context, listId, entity, source) { await knex.transaction(async tx => { await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'manageSubscriptions'); @@ -523,6 +551,8 @@ async function updateWithConsistencyCheck(context, listId, entity) { ungroupSubscription(groupedFieldsMap, filteredEntity); + updateSourcesAndHash(filteredEntity, source, groupedFieldsMap); + await _update(tx, listId, existing, filteredEntity); }); } diff --git a/lib/senders.js b/obsolete/lib/senders.js similarity index 100% rename from lib/senders.js rename to obsolete/lib/senders.js diff --git a/services/feedcheck.js b/obsolete/services/feedcheck.js similarity index 100% rename from services/feedcheck.js rename to obsolete/services/feedcheck.js diff --git a/obsolete/services/importer.js b/obsolete/services/importer.js new file mode 100644 index 00000000..fc8d13a2 --- /dev/null +++ b/obsolete/services/importer.js @@ -0,0 +1,280 @@ +'use strict'; + +// FIXME - revisit and rewrite if necessary + +let log = require('npmlog'); + +let db = require('../lib/db'); +let tools = require('../lib/tools'); +let _ = require('../lib/translate')._; + +let fields = require('../lib/models/fields'); +let subscriptions = require('../lib/models/subscriptions'); +let fs = require('fs'); +let csvparse = require('csv-parse'); + +const process_timout = 5 * 1000; + +function findUnprocessed(callback) { + db.getConnection((err, connection) => { + if (err) { + return callback(err); + } + + let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1'; + connection.query(query, (err, rows) => { + if (err) { + connection.release(); + return callback(err); + } + + if (!rows || !rows.length) { + connection.release(); + return callback(null, false); + } + + let importer = rows[0]; + + let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1'; + connection.query(query, [importer.id], (err, result) => { + connection.release(); + if (err) { + return callback(err); + } + if (!result.affectedRows) { + // check next one + return findUnprocessed(callback); + } + + let importer = tools.convertKeys(rows[0]); + try { + importer.mapping = JSON.parse(importer.mapping); + } catch (E) { + importer.mapping = { + columns: [], + mapping: {} + }; + } + + return callback(null, importer); + }); + }); + }); +} + +function processImport(data, callback) { + let parser = csvparse({ + comment: '#', + delimiter: data.delimiter + }); + + let listId = data.list; + + fields.list(data.list, (err, fieldList) => { + if (err && !fieldList) { + fieldList = []; + } + + let firstRow; + let finished = false; + let inputStream = fs.createReadStream(data.path); + let fieldTypes = {}; + + fieldList.forEach(field => { + if (field.column) { + fieldTypes[field.column] = field.type; + } + if (field.options) { + field.options.forEach(subField => { + if (subField.column) { + fieldTypes[subField.column] = subField.type; + } + }); + } + }); + + inputStream.on('error', err => { + if (finished) { + return; + } + log.error('Import', err.stack); + finished = true; + return callback(err); + }); + + parser.on('error', err => { + if (finished) { + return; + } + log.error('Import', err.stack); + finished = true; + return callback(err); + }); + + let processing = false; + let processRows = () => { + let record = parser.read(); + if (record === null) { + processing = false; + return; + } + processing = true; + + if (!firstRow) { + firstRow = record; + return setImmediate(processRows); + } + + let entry = {}; + Object.keys(data.mapping.mapping || {}).forEach(key => { + // TODO: process all data types + if (fieldTypes[key] === 'option') { + entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0; + } else if (fieldTypes[key] === 'number') { + entry[key] = Number(record[data.mapping.mapping[key]]) || 0; + } else { + entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null; + } + }); + + if (!entry.email) { + log.verbose('Import', 'Failed processing row, email missing'); + return setImmediate(processRows); + } + + function insertToSubscription() { + subscriptions.insert(listId, { + imported: data.id, + status: data.type, + partial: true + }, entry, (err, response) => { + if (err) { + // ignore + log.error('Import', err.stack); + } else if (response.entryId) { + //log.verbose('Import', 'Inserted %s as %s', entry.email, entryId); + } + + db.getConnection((err, connection) => { + if (err) { + log.error('Import', err.stack); + return setImmediate(processRows); + } + + let query; + if (response.inserted) { + // this record did not exist before, count as new + query = 'UPDATE importer SET `processed`=`processed`+1, `new`=`new`+1 WHERE `id`=? LIMIT 1'; + } else { + // it's an existing record + query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1'; + } + + connection.query(query, [data.id], () => { + connection.release(); + return setImmediate(processRows); + }); + }); + }); + } + + if (data.emailcheck === 1) { + tools.validateEmail(entry.email, true, err => { + if (err) { + let reason = (err.message || '').toString().trim().replace(/^[a-z]Error:\s*/i, ''); + log.verbose('Import', 'Failed processing row %s: %s', entry.email, reason); + db.getConnection((err, connection) => { + if (err) { + log.error('Import', err.stack); + return setImmediate(processRows); + } + + let query = 'INSERT INTO import_failed (`import`, `email`, `reason`) VALUES(?,?,?)'; + connection.query(query, [data.id, entry.email, reason], err => { + if (err) { + connection.release(); + return setImmediate(processRows); + } + let query = 'UPDATE importer SET `failed`=`failed`+1 WHERE `id`=? LIMIT 1'; + connection.query(query, [data.id], () => { + connection.release(); + return setImmediate(processRows); + }); + }); + }); + return; + } + insertToSubscription(); + }); + } else { + insertToSubscription(); + } + }; + + parser.on('readable', () => { + if (finished || processing) { + return; + } + processRows(); + }); + + parser.on('finish', () => { + if (finished) { + return; + } + finished = true; + callback(null, true); + }); + + inputStream.pipe(parser); + }); +} + +let importLoop = () => { + let getNext = () => { + // find an unsent message + findUnprocessed((err, data) => { + if (err) { + log.error('Import', err.stack); + setTimeout(getNext, process_timout); + return; + } + if (!data) { + setTimeout(getNext, process_timout); + return; + } + + processImport(data, err => { + let failed = null; + if (err) { + if (err.code === 'ENOENT') { + failed = _('Could not access import file'); + } else { + failed = err.message || err; + } + } + + db.getConnection((err, connection) => { + if (err) { + log.error('Import', err.stack); + return setTimeout(getNext, process_timout); + } + + let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1'; + + connection.query(query, [!failed ? 3 : 4, failed, data.id], () => { + connection.release(); + + getNext(); + }); + }); + }); + }); + }; + getNext(); +}; + +module.exports = callback => { + importLoop(); + setImmediate(callback); +}; diff --git a/services/postfix-bounce-server.js b/obsolete/services/postfix-bounce-server.js similarity index 100% rename from services/postfix-bounce-server.js rename to obsolete/services/postfix-bounce-server.js diff --git a/services/sender.js b/obsolete/services/sender.js similarity index 100% rename from services/sender.js rename to obsolete/services/sender.js diff --git a/services/triggers.js b/obsolete/services/triggers.js similarity index 100% rename from services/triggers.js rename to obsolete/services/triggers.js diff --git a/services/verp-server.js b/obsolete/services/verp-server.js similarity index 100% rename from services/verp-server.js rename to obsolete/services/verp-server.js diff --git a/routes/api.js b/routes/api.js index a0320a89..88e4a0e7 100644 --- a/routes/api.js +++ b/routes/api.js @@ -4,7 +4,7 @@ const lists = require('../models/lists'); const tools = require('../lib/tools'); const blacklist = require('../models/blacklist'); const fields = require('../models/fields'); -const { SubscriptionStatus } = require('../shared/lists'); +const { SubscriptionStatus, SubscriptionSource } = require('../shared/lists'); const subscriptions = require('../models/subscriptions'); const confirmations = require('../models/confirmations'); const log = require('npmlog'); @@ -77,7 +77,7 @@ router.postAsync('/subscribe/:listCid', passport.loggedIn, async (req, res) => { subscribeIfNoExisting: true }; - await subscriptions.create(req.context, list.id, subscription, meta); + await subscriptions.create(req.context, list.id, subscription, SubscriptionSource.API, meta); res.status(200); res.json({ diff --git a/routes/rest/subscriptions.js b/routes/rest/subscriptions.js index 81c75e73..586f6a3b 100644 --- a/routes/rest/subscriptions.js +++ b/routes/rest/subscriptions.js @@ -2,6 +2,7 @@ const passport = require('../../lib/passport'); const subscriptions = require('../../models/subscriptions'); +const { SubscriptionSource } = require('../../shared/lists'); const router = require('../../lib/router-async').create(); @@ -17,14 +18,14 @@ router.getAsync('/subscriptions/:listId/:subscriptionId', passport.loggedIn, asy }); router.postAsync('/subscriptions/:listId', passport.loggedIn, passport.csrfProtection, async (req, res) => { - return res.json(await subscriptions.create(req.context, req.params.listId, req.body)); + return res.json(await subscriptions.create(req.context, req.params.listId, req.body, SubscriptionSource.ADMIN_FORM)); }); router.putAsync('/subscriptions/:listId/:subscriptionId', passport.loggedIn, passport.csrfProtection, async (req, res) => { const entity = req.body; entity.id = parseInt(req.params.subscriptionId); - await subscriptions.updateWithConsistencyCheck(req.context, req.params.listId, entity); + await subscriptions.updateWithConsistencyCheck(req.context, req.params.listId, entity, SubscriptionSource.ADMIN_FORM); return res.json(); }); diff --git a/routes/subscription.js b/routes/subscription.js index 3ae130d0..17465bef 100644 --- a/routes/subscription.js +++ b/routes/subscription.js @@ -15,7 +15,7 @@ const forms = require('../models/forms'); const {getTrustedUrl} = require('../lib/urls'); const bluebird = require('bluebird'); -const { SubscriptionStatus } = require('../shared/lists'); +const { SubscriptionStatus, SubscriptionSource } = require('../shared/lists'); const openpgp = require('openpgp'); const cors = require('cors'); @@ -115,7 +115,7 @@ router.getAsync('/confirm/subscribe/:cid', async (req, res) => { subscription.status = SubscriptionStatus.SUBSCRIBED; try { - await subscriptions.create(contextHelpers.getAdminContext(), confirmation.list, subscription, meta); + await subscriptions.create(contextHelpers.getAdminContext(), confirmation.list, subscription, SubscriptionSource.SUBSCRIPTION_FORM, meta); } catch (err) { if (err instanceof interoperableErrors.DuplicitEmailError) { throw new interoperableErrors.DuplicitEmailError('Subscription already present'); // This is here to provide some meaningful error message. diff --git a/services/importer.js b/services/importer.js index fc8d13a2..16967860 100644 --- a/services/importer.js +++ b/services/importer.js @@ -1,280 +1,143 @@ 'use strict'; -// FIXME - revisit and rewrite if necessary +const knex = require('../lib/knex'); +const path = require('path'); +const log = require('npmlog'); +const fsExtra = require('fs-extra-promise'); +const {ImportType, ImportStatus, RunStatus} = require('../shared/imports'); +const imports = require('../models/imports'); -let log = require('npmlog'); +const csvparse = require('csv-parse'); +const fs = require('fs'); -let db = require('../lib/db'); -let tools = require('../lib/tools'); -let _ = require('../lib/translate')._; +let running = false; -let fields = require('../lib/models/fields'); -let subscriptions = require('../lib/models/subscriptions'); -let fs = require('fs'); -let csvparse = require('csv-parse'); - -const process_timout = 5 * 1000; - -function findUnprocessed(callback) { - db.getConnection((err, connection) => { - if (err) { - return callback(err); +function prepareCsv(impt) { + async function finishWithError(msg, err) { + if (finished) { + return; } - let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1'; - connection.query(query, (err, rows) => { - if (err) { - connection.release(); - return callback(err); - } + finished = true; + log.error('Importer (CSV)', err.stack); - if (!rows || !rows.length) { - connection.release(); - return callback(null, false); - } - - let importer = rows[0]; - - let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1'; - connection.query(query, [importer.id], (err, result) => { - connection.release(); - if (err) { - return callback(err); - } - if (!result.affectedRows) { - // check next one - return findUnprocessed(callback); - } - - let importer = tools.convertKeys(rows[0]); - try { - importer.mapping = JSON.parse(importer.mapping); - } catch (E) { - importer.mapping = { - columns: [], - mapping: {} - }; - } - - return callback(null, importer); - }); + await knex('imports').where('id', impt.id).update({ + status: ImportStatus.PREP_FAILED, + error: msg + '\n' + err.stack }); - }); -} -function processImport(data, callback) { - let parser = csvparse({ + await fsExtra.removeAsync(filePath); + } + + async function finishWithSuccess() { + if (finished) { + return; + } + + finished = true; + log.info('Importer (CSV)', 'Preparation finished'); + + await knex('imports').where('id', impt.id).update({ + status: ImportStatus.PREP_FINISHED, + error: null + }); + + await fsExtra.removeAsync(filePath); + } + + // Processing of CSV intake + const filePath = path.join(imports.filesDir, impt.settings.csv.filename); + + const parser = csvparse({ comment: '#', - delimiter: data.delimiter + delimiter: impt.settings.csv.delimiter }); - let listId = data.list; + const inputStream = fs.createReadStream(filePath); + let finished; + + inputStream.on('error', err => finishWithError('Error reading CSV file.', err)); + parser.on('error', err => finishWithError('Error parsing CSV file.', err)); + + let firstRow; + let processing = false; + const processRows = () => { + const record = parser.read(); + if (record === null) { + processing = false; + return; + } + processing = true; + + if (!firstRow) { + firstRow = record; + console.log(record); + return setImmediate(processRows); - fields.list(data.list, (err, fieldList) => { - if (err && !fieldList) { - fieldList = []; } - let firstRow; - let finished = false; - let inputStream = fs.createReadStream(data.path); - let fieldTypes = {}; + console.log(record); + return setImmediate(processRows); + }; - fieldList.forEach(field => { - if (field.column) { - fieldTypes[field.column] = field.type; - } - if (field.options) { - field.options.forEach(subField => { - if (subField.column) { - fieldTypes[subField.column] = subField.type; - } - }); - } - }); - - inputStream.on('error', err => { - if (finished) { - return; - } - log.error('Import', err.stack); - finished = true; - return callback(err); - }); - - parser.on('error', err => { - if (finished) { - return; - } - log.error('Import', err.stack); - finished = true; - return callback(err); - }); - - let processing = false; - let processRows = () => { - let record = parser.read(); - if (record === null) { - processing = false; - return; - } - processing = true; - - if (!firstRow) { - firstRow = record; - return setImmediate(processRows); - } - - let entry = {}; - Object.keys(data.mapping.mapping || {}).forEach(key => { - // TODO: process all data types - if (fieldTypes[key] === 'option') { - entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0; - } else if (fieldTypes[key] === 'number') { - entry[key] = Number(record[data.mapping.mapping[key]]) || 0; - } else { - entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null; - } - }); - - if (!entry.email) { - log.verbose('Import', 'Failed processing row, email missing'); - return setImmediate(processRows); - } - - function insertToSubscription() { - subscriptions.insert(listId, { - imported: data.id, - status: data.type, - partial: true - }, entry, (err, response) => { - if (err) { - // ignore - log.error('Import', err.stack); - } else if (response.entryId) { - //log.verbose('Import', 'Inserted %s as %s', entry.email, entryId); - } - - db.getConnection((err, connection) => { - if (err) { - log.error('Import', err.stack); - return setImmediate(processRows); - } - - let query; - if (response.inserted) { - // this record did not exist before, count as new - query = 'UPDATE importer SET `processed`=`processed`+1, `new`=`new`+1 WHERE `id`=? LIMIT 1'; - } else { - // it's an existing record - query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1'; - } - - connection.query(query, [data.id], () => { - connection.release(); - return setImmediate(processRows); - }); - }); - }); - } - - if (data.emailcheck === 1) { - tools.validateEmail(entry.email, true, err => { - if (err) { - let reason = (err.message || '').toString().trim().replace(/^[a-z]Error:\s*/i, ''); - log.verbose('Import', 'Failed processing row %s: %s', entry.email, reason); - db.getConnection((err, connection) => { - if (err) { - log.error('Import', err.stack); - return setImmediate(processRows); - } - - let query = 'INSERT INTO import_failed (`import`, `email`, `reason`) VALUES(?,?,?)'; - connection.query(query, [data.id, entry.email, reason], err => { - if (err) { - connection.release(); - return setImmediate(processRows); - } - let query = 'UPDATE importer SET `failed`=`failed`+1 WHERE `id`=? LIMIT 1'; - connection.query(query, [data.id], () => { - connection.release(); - return setImmediate(processRows); - }); - }); - }); - return; - } - insertToSubscription(); - }); - } else { - insertToSubscription(); - } - }; - - parser.on('readable', () => { - if (finished || processing) { - return; - } - processRows(); - }); - - parser.on('finish', () => { - if (finished) { - return; - } - finished = true; - callback(null, true); - }); - - inputStream.pipe(parser); + parser.on('readable', () => { + if (finished || processing) { + return; + } + processRows(); }); + + parser.on('finish', () => { + finishWithSuccess(); + }); + + inputStream.pipe(parser); } -let importLoop = () => { - let getNext = () => { - // find an unsent message - findUnprocessed((err, data) => { - if (err) { - log.error('Import', err.stack); - setTimeout(getNext, process_timout); - return; - } - if (!data) { - setTimeout(getNext, process_timout); - return; +async function getTask() { + await knex.transaction(async tx => { + const impt = await tx('imports').whereIn('status', [ImportStatus.PREP_SCHEDULED, ImportStatus.RUN_SCHEDULED]).orderBy('created', 'asc').first(); + + if (impt) { + impt.settings = JSON.parse(impt.settings); + + if (impt.type === ImportType.CSV_FILE && impt.status === ImportStatus.PREP_SCHEDULED) { + await tx('imports').where('id', impt.id).update('status', ImportStatus.PREP_RUNNING); + return () => prepareCsv(impt); } - processImport(data, err => { - let failed = null; - if (err) { - if (err.code === 'ENOENT') { - failed = _('Could not access import file'); - } else { - failed = err.message || err; - } - } + } else { + return null; + } + }) +} - db.getConnection((err, connection) => { - if (err) { - log.error('Import', err.stack); - return setTimeout(getNext, process_timout); - } +async function run() { + if (running) { + return; + } - let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1'; + running = true; - connection.query(query, [!failed ? 3 : 4, failed, data.id], () => { - connection.release(); + let task; + while ((task = await getTask()) != null) { + task(); + } - getNext(); - }); - }); - }); - }); - }; - getNext(); -}; + running = false; +} + +process.on('message', msg => { + if (msg) { + const type = msg.type; + + if (type === 'scheduleCheck') { + run() + } + } +}); + +process.send({ + type: 'importer-started' +}); -module.exports = callback => { - importLoop(); - setImmediate(callback); -}; diff --git a/setup/knex/migrations/20170506102634_v1_to_v2.js b/setup/knex/migrations/20170506102634_v1_to_v2.js index d3bae5b9..576d0aa5 100644 --- a/setup/knex/migrations/20170506102634_v1_to_v2.js +++ b/setup/knex/migrations/20170506102634_v1_to_v2.js @@ -8,6 +8,7 @@ const shareableEntityTypes = ['list', 'custom_form', 'template', 'campaign', 're const { MailerType, getSystemSendConfigurationId } = require('../../../shared/send-configurations'); const { enforce } = require('../../../lib/helpers'); const { EntityVals: TriggerEntityVals, EventVals: TriggerEventVals } = require('../../../shared/triggers'); +const { SubscriptionSource } = require('../../../shared/lists'); const entityTypesWithFiles = { campaign: { @@ -72,7 +73,7 @@ async function migrateBase(knex) { // Original Mailtrain migration is executed before this one. So here we check that the original migration // ended where it should have and we take it from here. const row = await knex('settings').where({key: 'db_schema_version'}).first('value'); - if (!row || Number(row.value) !== 29) { + if (!row || Number(row.value) !== 33) { throw new Error('Unsupported DB schema version: ' + row.value); } @@ -148,6 +149,7 @@ async function migrateBase(knex) { .raw('ALTER TABLE `triggers` MODIFY `id` int unsigned not null auto_increment') .raw('ALTER TABLE `triggers` MODIFY `list` int unsigned not null') + .raw('ALTER TABLE `triggers` MODIFY `segment` int unsigned not null') .raw('ALTER TABLE `triggers` MODIFY `source_campaign` int unsigned default null') .raw('ALTER TABLE `triggers` MODIFY `dest_campaign` int unsigned default null') @@ -231,6 +233,40 @@ async function migrateUsers(knex) { async function migrateSubscriptions(knex) { await knex.schema.dropTableIfExists('subscription'); + + const lists = await knex('lists'); + for (const list of lists) { + await knex.schema.raw('ALTER TABLE `subscription__' + list.id + '` ADD `source_email` int(10) unsigned DEFAULT NULL'); + await knex.schema.raw('ALTER TABLE `subscription__' + list.id + '` ADD `hash_email` varchar(255) CHARACTER SET ascii'); + + + const fields = await knex('custom_fields').where('list', list.id); + for (const field of fields) { + if (field.column != null) { + await knex.schema.raw('ALTER TABLE `subscription__' + list.id + '` ADD `source_' + field.column +'` int(11) DEFAULT NULL'); + } + } + + const subscriptionsStream = knex('subscription__' + list.id).stream(); + let subscription; + while ((subscription = subscriptionsStream.read()) != null) { + subscription.hash_email = crypto.createHash('sha512').update(subscription.email).digest("base64"); + subscription.source_email = subscription.imported ? SubscriptionSource.IMPORTED_V1 : SubscriptionSource.NOT_IMPORTED_V1; + for (const field of fields) { + if (field.column != null) { + subscription['source_' + field.column] = subscription.imported ? SubscriptionSource.IMPORTED_V1 : SubscriptionSource.NOT_IMPORTED_V1; + } + } + + await knex('subscription__' + list.id).where('id', subscription.id).update(subscription); + } + + await knex.schema.raw('ALTER TABLE `subscription__' + list.id + '` MODIFY `hash_email` varchar(255) CHARACTER SET ascii NOT NULL'); + + await knex.schema.table('subscription__' + list.id, table => { + table.dropColumn('imported'); + }); + } } async function migrateCustomForms(knex) { @@ -626,6 +662,7 @@ async function migrateSettings(knex) { table.text('mailer_settings', 'longtext'); table.timestamp('created').defaultTo(knex.fn.now()); table.integer('namespace').unsigned().references('namespaces.id'); + table.string('x_mailer'); }); await knex.schema.table('lists', table => { @@ -695,6 +732,7 @@ async function migrateSettings(knex) { verp_hostname: settings.verpUse ? settings.verpHostname : null, mailer_type, mailer_settings: JSON.stringify(mailer_settings), + x_mailer: settings.x_mailer, namespace: getGlobalNamespaceId() }); @@ -917,6 +955,8 @@ async function migrateCampaigns(knex) { table.renameColumn('address', 'from_email_override'); table.renameColumn('reply_to', 'reply_to_override'); table.renameColumn('subject', 'subject_override'); + table.renameColumn('unsubscribe', 'unsubscribe_url'); + // Remove the default value table.integer('send_configuration').unsigned().notNullable().alter(); @@ -970,6 +1010,7 @@ async function migrateTriggers(knex) { await knex.schema.table('triggers', table => { table.dropForeign('list', 'triggers_ibfk_1'); table.dropColumn('list'); + table.dropColumn('segment'); }); await knex.schema.dropTableIfExists('trigger'); @@ -988,6 +1029,7 @@ async function migrateImporter(knex) { table.integer('status').unsigned().notNullable(); table.text('settings', 'longtext'); table.timestamp('last_run'); + table.text('error'); table.timestamp('created').defaultTo(knex.fn.now()); }); diff --git a/shared/imports.js b/shared/imports.js index 487555f9..0f8d6674 100644 --- a/shared/imports.js +++ b/shared/imports.js @@ -10,10 +10,15 @@ const ImportType = { }; const ImportStatus = { - NOT_READY: 0, - SCHEDULED: 1, - RUNNING: 2, - FINISHED: 3 + PREP_SCHEDULED: 0, + PREP_RUNNING: 1, + PREP_FINISHED: 2, + PREP_FAILED: 3, + + RUN_SCHEDULED: 4, + RUN_RUNNING: 5, + RUN_FINISHED: 6, + RUN_FAILED: 7 }; const RunStatus = { diff --git a/shared/lists.js b/shared/lists.js index d80421a7..74421058 100644 --- a/shared/lists.js +++ b/shared/lists.js @@ -23,6 +23,14 @@ const SubscriptionStatus = { MAX: 4 }; +const SubscriptionSource = { + ADMIN_FORM: -1, + SUBSCRIPTION_FORM: -2, + API: -3, + NOT_IMPORTED_V1: -4, + IMPORTED_V1: -5 +}; + function getFieldKey(field) { return field.column || 'grouped_' + field.id; } @@ -30,5 +38,6 @@ function getFieldKey(field) { module.exports = { UnsubscriptionMode, SubscriptionStatus, + SubscriptionSource, getFieldKey }; \ No newline at end of file