mailtrain/services/importer.js

209 lines
6 KiB
JavaScript
Raw Normal View History

2016-04-04 12:36:30 +00:00
'use strict';
2018-08-06 14:54:51 +00:00
const knex = require('../lib/knex');
const path = require('path');
const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
2018-08-06 14:54:51 +00:00
const imports = require('../models/imports');
const { Writable } = require('stream');
const { enforce } = require('../lib/helpers');
2018-08-06 14:54:51 +00:00
const csvparse = require('csv-parse');
const fs = require('fs');
let running = false;
const maxInsertBatchSize = 100;
2018-08-06 14:54:51 +00:00
function prepareCsv(impt) {
// Processing of CSV intake
const filePath = path.join(imports.filesDir, impt.settings.csv.filename);
const importTable = 'import_file__' + impt.id;
let finishedWithError = false;
let firstRow;
2016-04-04 12:36:30 +00:00
const finishWithError = async (msg, err) => {
finishedWithError = true;
2018-08-06 14:54:51 +00:00
log.error('Importer (CSV)', err.stack);
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FAILED,
error: msg + '\n' + err.message
2018-08-06 14:54:51 +00:00
});
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
await fsExtra.removeAsync(filePath);
};
const finishWithSuccess = async () => {
if (finishedWithError) {
2018-08-06 14:54:51 +00:00
return;
2016-04-04 12:36:30 +00:00
}
2018-08-06 14:54:51 +00:00
log.info('Importer (CSV)', 'Preparation finished');
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FINISHED,
error: null
2016-04-04 12:36:30 +00:00
});
2018-08-06 14:54:51 +00:00
await fsExtra.removeAsync(filePath);
};
2018-08-06 14:54:51 +00:00
const processRows = async (chunks) => {
let insertBatch = [];
for (const chunkEntry of chunks) {
const record = chunkEntry.chunk;
if (!firstRow) {
firstRow = true;
const cols = [];
let colsDef = '';
for (let idx = 0; idx < record.length; idx++) {
const colName = 'column_' + idx;
cols.push({
column: colName,
name: record[idx]
});
colsDef += ' `' + colName + '` text DEFAULT NULL,\n';
}
impt.settings.csv.columns = cols;
await knex('imports').where({id: impt.id}).update({settings: JSON.stringify(impt.settings)});
await knex.schema.raw('CREATE TABLE `' + importTable + '` (\n' +
' `id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n' +
colsDef +
' PRIMARY KEY (`id`)\n' +
') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;\n');
} else {
const dbRecord = {};
for (let idx = 0; idx < record.length; idx++) {
dbRecord['column_' + idx] = record[idx];
}
insertBatch.push(dbRecord);
}
if (insertBatch.length >= maxInsertBatchSize) {
await knex(importTable).insert(insertBatch);
insertBatch = [];
}
}
if (insertBatch.length > 0) {
await knex(importTable).insert(insertBatch);
}
};
2018-08-06 14:54:51 +00:00
const inputStream = fs.createReadStream(filePath);
2018-08-06 14:54:51 +00:00
const parser = csvparse({
2016-04-04 12:36:30 +00:00
comment: '#',
2018-08-06 14:54:51 +00:00
delimiter: impt.settings.csv.delimiter
2016-04-04 12:36:30 +00:00
});
2018-08-06 14:54:51 +00:00
inputStream.on('error', err => finishWithError('Error reading CSV file.', err));
parser.on('error', err => finishWithError('Error parsing CSV file.', err));
2016-04-04 12:36:30 +00:00
const importProcessor = new Writable({
write(chunk, encoding, callback) {
processRows([{chunk, encoding}]).then(() => callback());
},
writev(chunks, callback) {
processRows(chunks).then(() => callback());
},
final(callback) {
finishWithSuccess().then(() => callback());
},
objectMode: true
2018-08-06 14:54:51 +00:00
});
2016-04-04 12:36:30 +00:00
parser.pipe(importProcessor);
2018-08-06 14:54:51 +00:00
inputStream.pipe(parser);
}
2016-04-04 12:36:30 +00:00
async function basicSubscribe(impt) {
let imptRun;
while (imptRun = await knex('import_runs').where('import', impt.id).whereIn('status', [RunStatus.SCHEDULED]).orderBy('created', 'asc').first()) {
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.RUNNING
});
await knex('import_runs').where('id', imptRun.id).update({
status: RunStatus.FINISHED
});
}
await knex('imports').where('id', impt.id).update({
status: ImportStatus.RUN_FINISHED
});
}
async function basicUnsubscribe(impt) {
// FIXME
}
2018-08-06 14:54:51 +00:00
async function getTask() {
return await knex.transaction(async tx => {
2018-08-06 14:54:51 +00:00
const impt = await tx('imports').whereIn('status', [ImportStatus.PREP_SCHEDULED, ImportStatus.RUN_SCHEDULED]).orderBy('created', 'asc').first();
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
if (impt) {
impt.settings = JSON.parse(impt.settings);
2017-06-22 16:24:53 +00:00
if (impt.source === ImportSource.CSV_FILE && impt.status === ImportStatus.PREP_SCHEDULED) {
2018-08-06 14:54:51 +00:00
await tx('imports').where('id', impt.id).update('status', ImportStatus.PREP_RUNNING);
return () => prepareCsv(impt);
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.settings.mappingType === MappingType.BASIC_SUBSCRIBE) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.RUN_RUNNING);
return () => basicSubscribe(impt);
} else if (impt.status === ImportStatus.RUN_SCHEDULED && impt.settings.mappingType === MappingType.BASIC_UNSUBSCRIBE) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.RUN_RUNNING);
return () => basicUnsubscribe(impt);
2017-06-22 16:24:53 +00:00
}
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
} else {
return null;
}
});
2018-08-06 14:54:51 +00:00
}
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
async function run() {
if (running) {
return;
}
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
running = true;
let task;
while ((task = await getTask()) != null) {
task();
}
running = false;
2016-04-04 12:36:30 +00:00
}
2018-08-06 14:54:51 +00:00
process.on('message', msg => {
if (msg) {
const type = msg.type;
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
if (type === 'scheduleCheck') {
run();
2018-08-06 14:54:51 +00:00
}
}
});
process.send({
type: 'importer-started'
});
2016-04-04 12:36:30 +00:00
run();