mailtrain/services/importer.js

241 lines
7.2 KiB
JavaScript
Raw Normal View History

2016-04-04 12:36:30 +00:00
'use strict';
let log = require('npmlog');
let db = require('../lib/db');
let tools = require('../lib/tools');
let fields = require('../lib/models/fields');
let subscriptions = require('../lib/models/subscriptions');
let fs = require('fs');
let csvparse = require('csv-parse');
function findUnprocessed(callback) {
db.getConnection((err, connection) => {
if (err) {
return callback(err);
}
let query = 'SELECT * FROM importer WHERE `status`=1 LIMIT 1';
connection.query(query, (err, rows) => {
if (err) {
connection.release();
return callback(err);
}
if (!rows || !rows.length) {
connection.release();
return callback(null, false);
}
let importer = rows[0];
let query = 'UPDATE importer SET `status`=2, `processed`=0 WHERE id=? AND `status`=1 LIMIT 1';
connection.query(query, [importer.id], (err, result) => {
connection.release();
if (err) {
return callback(err);
}
if (!result.affectedRows) {
// check next one
return findUnprocessed(callback);
}
let importer = tools.convertKeys(rows[0]);
try {
importer.mapping = JSON.parse(importer.mapping);
} catch (E) {
importer.mapping = {
columns: [],
mapping: {}
};
}
return callback(null, importer);
});
});
});
}
function processImport(data, callback) {
let parser = csvparse({
comment: '#',
delimiter: data.delimiter
});
let listId = data.list;
fields.list(data.list, (err, fieldList) => {
if (err && !fieldList) {
fieldList = [];
}
let firstRow;
let finished = false;
let inputStream = fs.createReadStream(data.path);
let fieldTypes = {};
fieldList.forEach(field => {
if (field.column) {
fieldTypes[field.column] = field.type;
}
if (field.options) {
field.options.forEach(subField => {
if (subField.column) {
fieldTypes[subField.column] = subField.type;
}
});
}
});
inputStream.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
parser.on('error', err => {
if (finished) {
return;
}
log.error('Import', err.stack);
finished = true;
return callback(err);
});
let processing = false;
let processRows = () => {
let record = parser.read();
if (record === null) {
processing = false;
return;
}
processing = true;
if (!firstRow) {
firstRow = record;
return setImmediate(processRows);
}
let entry = {};
Object.keys(data.mapping.mapping || {}).forEach(key => {
// TODO: process all data types
if (fieldTypes[key] === 'option') {
entry[key] = ['', '0', 'false', 'no', 'null'].indexOf((record[data.mapping.mapping[key]] || '').toString().toLowerCase().trim()) < 0 ? 1 : 0;
} else if (fieldTypes[key] === 'number') {
entry[key] = Number(record[data.mapping.mapping[key]]) || 0;
} else {
entry[key] = (record[data.mapping.mapping[key]] || '').toString().trim() || null;
}
});
if (!entry.email) {
log.verbose('Import', 'Failed processing row, email missing');
return setImmediate(processRows);
}
tools.validateEmail(entry.email, true, err => {
if (err) {
log.verbose('Import', 'Failed processing row %s: %s', entry.email, err.message);
return setImmediate(processRows);
}
subscriptions.insert(listId, {
imported: data.id,
status: data.type
}, entry, (err, entryId) => {
if (err) {
// ignore
log.error('Import', err.stack);
} else if (entryId) {
//log.verbose('Import', 'Inserted %s as %s', entry.email, entryId);
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setImmediate(processRows);
}
let query = 'UPDATE importer SET `processed`=`processed`+1 WHERE `id`=? LIMIT 1';
connection.query(query, [data.id], () => {
connection.release();
return setImmediate(processRows);
});
});
});
});
};
parser.on('readable', () => {
if (finished || processing) {
return;
}
processRows();
});
parser.on('finish', () => {
if (finished) {
return;
}
finished = true;
callback(null, true);
});
inputStream.pipe(parser);
});
}
let importLoop = () => {
let getNext = () => {
// find an unsent message
findUnprocessed((err, data) => {
if (err) {
log.error('Import', err.stack);
setTimeout(getNext, 5 * 1000);
return;
}
if (!data) {
setTimeout(getNext, 5 * 1000);
return;
}
processImport(data, err => {
let failed = null;
if (err) {
if (err.code === 'ENOENT') {
failed = 'Could not access import file';
} else {
failed = err.message || err;
}
}
db.getConnection((err, connection) => {
if (err) {
log.error('Import', err.stack);
return setTimeout(getNext, 5 * 1000);
}
let query = 'UPDATE importer SET `status`=?, `error`=?, `finished`=NOW() WHERE `id`=? AND `status`=2 LIMIT 1';
connection.query(query, [!failed ? 3 : 4, failed, data.id], () => {
connection.release();
getNext();
});
});
});
});
};
getNext();
};
module.exports = callback => {
importLoop();
setImmediate(callback);
};