Basic import seems to work

This commit is contained in:
Tomas Bures 2018-09-01 21:29:10 +02:00
parent 16519c5353
commit d74806dde3
21 changed files with 555 additions and 749 deletions

View file

@ -6,14 +6,22 @@ const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');
const fields = require('../models/fields');
const subscriptions = require('../models/subscriptions');
const { Writable } = require('stream');
const { enforce } = require('../lib/helpers');
const { cleanupFromPost, enforce } = require('../lib/helpers');
const contextHelpers = require('../lib/context-helpers');
const tools = require('../lib/tools');
const shares = require('../models/shares');
const _ = require('../lib/translate')._;
const csvparse = require('csv-parse');
const fs = require('fs');
let running = false;
const maxInsertBatchSize = 100;
const maxPrepareBatchSize = 100;
const maxImportBatchSize = 10;
function prepareCsv(impt) {
// Processing of CSV intake
@ -71,6 +79,8 @@ function prepareCsv(impt) {
}
impt.settings.csv.columns = cols;
impt.settings.sourceTable = importTable;
await knex('imports').where({id: impt.id}).update({settings: JSON.stringify(impt.settings)});
await knex.schema.raw('CREATE TABLE `' + importTable + '` (\n' +
@ -88,7 +98,7 @@ function prepareCsv(impt) {
insertBatch.push(dbRecord);
}
if (insertBatch.length >= maxInsertBatchSize) {
if (insertBatch.length >= maxPrepareBatchSize) {
await knex(importTable).insert(insertBatch);
insertBatch = [];
}
@ -126,27 +136,220 @@ function prepareCsv(impt) {
inputStream.pipe(parser);
}
async function basicSubscribe(impt) {
let imptRun;
while (imptRun = await knex('import_runs').where('import', impt.id).whereIn('status', [RunStatus.SCHEDULED]).orderBy('created', 'asc').first()) {
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.RUNNING
async function _execImportRun(impt, handlers) {
// FIXME - handle STOPPING
try {
let imptRun;
// It should not really happen that we have more than one run to be processed for an import. However, to be on the safe side, we process it in a while.
while (imptRun = await knex('import_runs').where('import', impt.id).whereIn('status', [RunStatus.SCHEDULED]).orderBy('created', 'asc').first()) {
try {
imptRun.mapping = JSON.parse(imptRun.mapping) || {};
log.info('Importer', `Starting BASIC_SUBSCRIBE run ${impt.id}.${imptRun.id}`);
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.RUNNING
});
const importTable = impt.settings.sourceTable;
const flds = await fields.list(contextHelpers.getAdminContext(), impt.list);
let lastId = imptRun.last_id || 0;
let countNew = imptRun.new || 0;
let countProcessed = imptRun.processed || 0;
let countFailed = imptRun.failed || 0;
while (true) {
const rows = await knex(importTable).orderBy('id', 'asc').where('id', '>', lastId).limit(maxImportBatchSize);
log.verbose('Importer', `Processing run ${impt.id}.${imptRun.id} with id > ${lastId} ... ${rows.length} entries`);
if (rows.length === 0) {
break;
}
const subscrs = [];
const unsubscrs = [];
const failures = [];
// This should help in case we do the DNS check inside process row because it does all the checks at the same time.
await Promise.all(rows.map(row => handlers.processSourceRow(impt, imptRun, flds, row, subscrs, unsubscrs, failures)));
lastId = rows[rows.length - 1].id;
await knex.transaction(async tx => {
const groupedFieldsMap = await subscriptions.getGroupedFieldsMap(tx, impt.list);
let newRows = 0;
for (const subscr of subscrs) {
const meta = {
updateAllowed: true,
updateOfUnsubscribedAllowed: true,
subscribeIfNoExisting: true
};
try {
await subscriptions.createTxWithGroupedFieldsMap(tx, contextHelpers.getAdminContext(), impt.list, groupedFieldsMap, subscr, impt.id, meta);
if (!meta.existing) {
newRows += 1;
}
} catch (err) {
failures.push({
run: imptRun.id,
source_id: subscr.source_id,
email: subscr.email,
reason: err.message
});
}
}
for (const unsubscr of unsubscrs) {
try {
await subscriptions.unsubscribeByEmailAndGetTx(tx, contextHelpers.getAdminContext(), impt.list, unsubscr.email);
} catch (err) {
failures.push({
run: imptRun.id,
source_id: unsubscr.source_id,
email: unsubscr.email,
reason: err.message
});
}
}
countProcessed += rows.length;
countNew += newRows;
countFailed += failures.length;
if (failures.length > 0) {
await tx('import_failed').insert(failures);
}
await tx('import_runs').where('id', imptRun.id).update({
last_id: lastId,
new: countNew,
failed: countFailed,
processed: countProcessed
});
});
const imptRunStatus = await knex('import_runs').where('id', imptRun.id).select(['status']).first();
if (imptRunStatus.status === RunStatus.STOPPING) {
throw new Error('Aborted');
}
}
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.FINISHED,
error: null,
finished: new Date()
});
log.info('Importer', `BASIC_SUBSCRIBE run ${impt.id}.${imptRun.id} finished`);
} catch (err) {
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.FAILED,
error: err.message,
finished: new Date()
});
throw new Error(_('Last run failed'));
}
}
await knex('imports').where('id', impt.id).update({
last_run: new Date(),
error: null,
status: ImportStatus.RUN_FINISHED
});
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.FINISHED
} catch (err) {
await knex('imports').where('id', impt.id).update({
last_run: new Date(),
error: err.message,
status: ImportStatus.RUN_FAILED
});
}
}
await knex('imports').where('id', impt.id).update({
status: ImportStatus.RUN_FINISHED
});
async function basicSubscribe(impt) {
const handlers = {
processSourceRow: async (impt, imptRun, flds, row, subscriptions, unsubscriptions, failures) => {
const mappingFields = imptRun.mapping.fields || {};
const mappingSettings = imptRun.mapping.settings || {};
const convRow = {};
for (const col in mappingFields) {
const fldMapping = mappingFields[col];
if (fldMapping && fldMapping.column) {
convRow[col] = row[fldMapping.column];
}
}
const subscription = fields.fromImport(impt.list, flds, convRow);
const email = cleanupFromPost(convRow.email);
let errorMsg;
if (!email) {
errorMsg = _('Missing email');
}
if (mappingSettings.checkEmails) {
const emailErr = await tools.validateEmail(email);
if (emailErr) {
errorMsg = tools.validateEmailGetMessage(emailErr, email);
}
}
if (!errorMsg) {
subscription.email = email;
subscription.source_id = row.id;
subscriptions.push(subscription);
} else {
failures.push({
run: imptRun.id,
source_id: row.id,
email: email,
reason: errorMsg
});
}
}
};
return await _execImportRun(impt, handlers);
}
async function basicUnsubscribe(impt) {
// FIXME
const handlers = {
processSourceRow: async (impt, imptRun, flds, row, subscriptions, unsubscriptions, failures) => {
const emailCol = imptRun.mapping.fields.email.column;
const email = cleanupFromPost(row[emailCol]);
let errorMsg;
if (!email) {
errorMsg = _('Missing email');
}
if (!errorMsg) {
unsubscriptions.push({
source_id: row.id,
email
});
} else {
failures.push({
run: imptRun.id,
source_id: row.id,
email: email,
reason: errorMsg
});
}
}
};
return await _execImportRun(impt, handlers);
}
async function getTask() {
@ -154,17 +357,17 @@ async function getTask() {
const impt = await tx('imports').whereIn('status', [ImportStatus.PREP_SCHEDULED, ImportStatus.RUN_SCHEDULED]).orderBy('created', 'asc').first();
if (impt) {
impt.settings = JSON.parse(impt.settings);
impt.settings = JSON.parse(impt.settings) || {};
if (impt.source === ImportSource.CSV_FILE && impt.status === ImportStatus.PREP_SCHEDULED) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.PREP_RUNNING);
return () => prepareCsv(impt);
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.settings.mappingType === MappingType.BASIC_SUBSCRIBE) {
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.mapping_type === MappingType.BASIC_SUBSCRIBE) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.RUN_RUNNING);
return () => basicSubscribe(impt);
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.settings.mappingType === MappingType.BASIC_UNSUBSCRIBE) {
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.mapping_type === MappingType.BASIC_UNSUBSCRIBE) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.RUN_RUNNING);
return () => basicUnsubscribe(impt);
}