Preparation of merge with master

This commit is contained in:
Tomas Bures 2018-08-06 20:24:51 +05:30
parent 6648028270
commit cd798b5af7
26 changed files with 607 additions and 285 deletions

View file

@ -1,280 +1,143 @@
'use strict';
// FIXME - revisit and rewrite if necessary
const knex = require('../lib/knex');
const path = require('path');
const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');
let log = require('npmlog');
const csvparse = require('csv-parse');
const fs = require('fs');
let db = require('../lib/db');
let tools = require('../lib/tools');
let _ = require('../lib/translate')._;
let running = false;
let fields = require('../lib/models/fields');
let subscriptions = require('../lib/models/subscriptions');
let fs = require('fs');
let csvparse = require('csv-parse');
const process_timout = 5 * 1000;
function findUnprocessed(callback) {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
function prepareCsv(impt) {
async function finishWithError(msg, err) {
if (finished) {
return;
}
let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1';
connection.query(query, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
finished = true;
log.error('Importer (CSV)', err.stack);
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let importer = rows[0];
let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1';
connection.query(query, [importer.id], (err, result) => {
connection.release();
if (err) {
return callback(err);
}
if (!result.affectedRows) {
// check next one
return findUnprocessed(callback);
}
let importer = tools.convertKeys(rows[0]);
try {
importer.mapping = JSON.parse(importer.mapping);
} catch (E) {
importer.mapping = {
columns: [],
mapping: {}
};
}
return callback(null, importer);
});
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FAILED,
error: msg + '\n' + err.stack
});
});
}
function processImport(data, callback) {
let parser = csvparse({
await fsExtra.removeAsync(filePath);
}
async function finishWithSuccess() {
if (finished) {
return;
}
finished = true;
log.info('Importer (CSV)', 'Preparation finished');
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FINISHED,
error: null
});
await fsExtra.removeAsync(filePath);
}
// Processing of CSV intake
const filePath = path.join(imports.filesDir, impt.settings.csv.filename);
const parser = csvparse({
comment: '#',
delimiter: data.delimiter
delimiter: impt.settings.csv.delimiter
});
let listId = data.list;
const inputStream = fs.createReadStream(filePath);
let finished;
inputStream.on('error', err => finishWithError('Error reading CSV file.', err));
parser.on('error', err => finishWithError('Error parsing CSV file.', err));
let firstRow;
let processing = false;
const processRows = () => {
const record = parser.read();
if (record === null) {
processing = false;
return;
}
processing = true;
if (!firstRow) {
firstRow = record;
console.log(record);
return setImmediate(processRows);
fields.list(data.list, (err, fieldList) => {
if (err && !fieldList) {
fieldList = [];
}
let firstRow;
let finished = false;
let inputStream = fs.createReadStream(data.path);
let fieldTypes = {};
console.log(record);
return setImmediate(processRows);
};
fieldList.forEach(field => {
if (field.column) {
fieldTypes[field.column] = field.type;
}
if (field.options) {
field.options.forEach(subField => {
if (subField.column) {
fieldTypes[subField.column] = subField.type;
}
});
}
});
inputStream.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
parser.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
let processing = false;
let processRows = () => {
let record = parser.read();
if (record === null) {
processing = false;
return;
}
processing = true;
if (!firstRow) {
firstRow = record;
return setImmediate(processRows);
}
let entry = {};
Object.keys(data.mapping.mapping || {}).forEach(key => {
// TODO: process all data types
if (fieldTypes[key] === 'option') {
entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0;
} else if (fieldTypes[key] === 'number') {
entry[key] = Number(record[data.mapping.mapping[key]]) || 0;
} else {
entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null;
}
});
if (!entry.email) {
log.verbose('Import', 'Failed processing row, email missing');
return setImmediate(processRows);
}
function insertToSubscription() {
subscriptions.insert(listId, {
imported: data.id,
status: data.type,
partial: true
}, entry, (err, response) => {
if (err) {
// ignore
log.error('Import', err.stack);
} else if (response.entryId) {
//log.verbose('Import', 'Inserted %s as %s', entry.email, entryId);
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query;
if (response.inserted) {
// this record did not exist before, count as new
query = 'UPDATE importer SET `processed`=`processed`+1, `new`=`new`+1 WHERE `id`=? LIMIT 1';
} else {
// it's an existing record
query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1';
}
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
}
if (data.emailcheck === 1) {
tools.validateEmail(entry.email, true, err => {
if (err) {
let reason = (err.message || '').toString().trim().replace(/^[a-z]Error:\s*/i, '');
log.verbose('Import', 'Failed processing row %s: %s', entry.email, reason);
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query = 'INSERT INTO import_failed (`import`, `email`, `reason`) VALUES(?,?,?)';
connection.query(query, [data.id, entry.email, reason], err => {
if (err) {
connection.release();
return setImmediate(processRows);
}
let query = 'UPDATE importer SET `failed`=`failed`+1 WHERE `id`=? LIMIT 1';
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
return;
}
insertToSubscription();
});
} else {
insertToSubscription();
}
};
parser.on('readable', () => {
if (finished || processing) {
return;
}
processRows();
});
parser.on('finish', () => {
if (finished) {
return;
}
finished = true;
callback(null, true);
});
inputStream.pipe(parser);
parser.on('readable', () => {
if (finished || processing) {
return;
}
processRows();
});
parser.on('finish', () => {
finishWithSuccess();
});
inputStream.pipe(parser);
}
let importLoop = () => {
let getNext = () => {
// find an unsent message
findUnprocessed((err, data) => {
if (err) {
log.error('Import', err.stack);
setTimeout(getNext, process_timout);
return;
}
if (!data) {
setTimeout(getNext, process_timout);
return;
async function getTask() {
await knex.transaction(async tx => {
const impt = await tx('imports').whereIn('status', [ImportStatus.PREP_SCHEDULED, ImportStatus.RUN_SCHEDULED]).orderBy('created', 'asc').first();
if (impt) {
impt.settings = JSON.parse(impt.settings);
if (impt.type === ImportType.CSV_FILE && impt.status === ImportStatus.PREP_SCHEDULED) {
await tx('imports').where('id', impt.id).update('status', ImportStatus.PREP_RUNNING);
return () => prepareCsv(impt);
}
processImport(data, err => {
let failed = null;
if (err) {
if (err.code === 'ENOENT') {
failed = _('Could not access import file');
} else {
failed = err.message || err;
}
}
} else {
return null;
}
})
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setTimeout(getNext, process_timout);
}
async function run() {
if (running) {
return;
}
let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1';
running = true;
connection.query(query, [!failed ? 3 : 4, failed, data.id], () => {
connection.release();
let task;
while ((task = await getTask()) != null) {
task();
}
getNext();
});
});
});
});
};
getNext();
};
running = false;
}
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'scheduleCheck') {
run()
}
}
});
process.send({
type: 'importer-started'
});
module.exports = callback => {
importLoop();
setImmediate(callback);
};