mailtrain/services/importer.js
2017-06-22 18:25:13 +02:00

278 lines
9.1 KiB
JavaScript

'use strict';
let log = require('npmlog');
let db = require('../lib/db');
let tools = require('../lib/tools');
let _ = require('../lib/translate')._;
let fields = require('../lib/models/fields');
let subscriptions = require('../lib/models/subscriptions');
let fs = require('fs');
let csvparse = require('csv-parse');
const process_timout = 5 * 1000;
function findUnprocessed(callback) {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1';
connection.query(query, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let importer = rows[0];
let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1';
connection.query(query, [importer.id], (err, result) => {
connection.release();
if (err) {
return callback(err);
}
if (!result.affectedRows) {
// check next one
return findUnprocessed(callback);
}
let importer = tools.convertKeys(rows[0]);
try {
importer.mapping = JSON.parse(importer.mapping);
} catch (E) {
importer.mapping = {
columns: [],
mapping: {}
};
}
return callback(null, importer);
});
});
});
}
function processImport(data, callback) {
let parser = csvparse({
comment: '#',
delimiter: data.delimiter
});
let listId = data.list;
fields.list(data.list, (err, fieldList) => {
if (err && !fieldList) {
fieldList = [];
}
let firstRow;
let finished = false;
let inputStream = fs.createReadStream(data.path);
let fieldTypes = {};
fieldList.forEach(field => {
if (field.column) {
fieldTypes[field.column] = field.type;
}
if (field.options) {
field.options.forEach(subField => {
if (subField.column) {
fieldTypes[subField.column] = subField.type;
}
});
}
});
inputStream.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
parser.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
let processing = false;
let processRows = () => {
let record = parser.read();
if (record === null) {
processing = false;
return;
}
processing = true;
if (!firstRow) {
firstRow = record;
return setImmediate(processRows);
}
let entry = {};
Object.keys(data.mapping.mapping || {}).forEach(key => {
// TODO: process all data types
if (fieldTypes[key] === 'option') {
entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0;
} else if (fieldTypes[key] === 'number') {
entry[key] = Number(record[data.mapping.mapping[key]]) || 0;
} else {
entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null;
}
});
if (!entry.email) {
log.verbose('Import', 'Failed processing row, email missing');
return setImmediate(processRows);
}
function insertToSubscription() {
subscriptions.insert(listId, {
imported: data.id,
status: data.type,
partial: true
}, entry, (err, response) => {
if (err) {
// ignore
log.error('Import', err.stack);
} else if (response.entryId) {
//log.verbose('Import', 'Inserted %s as %s', entry.email, entryId);
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query;
if (response.inserted) {
// this record did not exist before, count as new
query = 'UPDATE importer SET `processed`=`processed`+1, `new`=`new`+1 WHERE `id`=? LIMIT 1';
} else {
// it's an existing record
query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1';
}
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
}
if (data.emailcheck === 1) {
tools.validateEmail(entry.email, true, err => {
if (err) {
let reason = (err.message || '').toString().trim().replace(/^[a-z]Error:\s*/i, '');
log.verbose('Import', 'Failed processing row %s: %s', entry.email, reason);
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query = 'INSERT INTO import_failed (`import`, `email`, `reason`) VALUES(?,?,?)';
connection.query(query, [data.id, entry.email, reason], err => {
if (err) {
connection.release();
return setImmediate(processRows);
}
let query = 'UPDATE importer SET `failed`=`failed`+1 WHERE `id`=? LIMIT 1';
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
return;
}
insertToSubscription();
});
} else {
insertToSubscription();
}
};
parser.on('readable', () => {
if (finished || processing) {
return;
}
processRows();
});
parser.on('finish', () => {
if (finished) {
return;
}
finished = true;
callback(null, true);
});
inputStream.pipe(parser);
});
}
let importLoop = () => {
let getNext = () => {
// find an unsent message
findUnprocessed((err, data) => {
if (err) {
log.error('Import', err.stack);
setTimeout(getNext, process_timout);
return;
}
if (!data) {
setTimeout(getNext, process_timout);
return;
}
processImport(data, err => {
let failed = null;
if (err) {
if (err.code === 'ENOENT') {
failed = _('Could not access import file');
} else {
failed = err.message || err;
}
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setTimeout(getNext, process_timout);
}
let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1';
connection.query(query, [!failed ? 3 : 4, failed, data.id], () => {
connection.release();
getNext();
});
});
});
});
};
getNext();
};
module.exports = callback => {
importLoop();
setImmediate(callback);
};