updated transactional messages

This commit is contained in:
Andris Reinman 2016-05-04 11:27:46 +03:00
parent 4f2d66c30c
commit 2486f7b9d8
7 changed files with 353 additions and 46 deletions

View file

@ -18,49 +18,80 @@ function feedLoop() {
let query = 'SELECT `id`, `source_url`, `from`, `address`, `subject`, `list`, `segment` FROM `campaigns` WHERE `type`=2 AND `status`=6 AND (`last_check` IS NULL OR `last_check`< NOW() - INTERVAL 10 MINUTE) LIMIT 1';
connection.query(query, (err, rows) => {
connection.release();
if (err) {
connection.release();
log.error('Feed', err);
return setTimeout(feedLoop, 15 * 1000);
}
if (!rows || !rows.length) {
connection.release();
return setTimeout(feedLoop, 15 * 1000);
}
let parent = tools.convertKeys(rows[0]);
let query = 'UPDATE `campaigns` SET `last_check`=NOW() WHERE id=? LIMIT 1';
connection.query(query, [parent.id], err => {
connection.release();
if (err) {
log.error('Feed', err);
return setTimeout(feedLoop, 15 * 1000);
}
updateRssInfo(parent.id, true, false, () => {
log.verbose('Feed', 'Checking feed %s (%s)', parent.sourceUrl, parent.id);
feed.fetch(parent.sourceUrl, (err, entries) => {
if (err) {
log.error('Feed', err);
return setTimeout(feedLoop, 1 * 1000);
return updateRssInfo(parent.id, false, 'Feed error: ' + err.message, () => {
setTimeout(feedLoop, 1 * 1000);
});
}
checkEntries(parent, entries, (err, result) => {
let message;
if (err) {
log.error('Feed', err);
}
if (result) {
message = 'Feed error: ' + err.message;
} else if (result) {
log.verbose('Feed', 'Added %s new campaigns for %s', result, parent.id);
message = 'Found ' + result + ' new campaing messages from feed';
} else {
message = 'Found nothing new from the feed';
}
return setTimeout(feedLoop, 1 * 1000);
return updateRssInfo(parent.id, false, message, () => {
setTimeout(feedLoop, 1 * 1000);
});
});
});
});
});
});
}
function updateRssInfo(id, updateCheck, status, callback) {
db.getConnection((err, connection) => {
if (err) {
log.error('Feed', err.stack);
return callback(err);
}
let query;
let values;
if (updateCheck) {
if (status) {
query = 'UPDATE `campaigns` SET `last_check`=NOW(), `check_status`=? WHERE id=? LIMIT 1';
values = [status, id];
} else {
query = 'UPDATE `campaigns` SET `last_check`=NOW() WHERE id=? LIMIT 1';
values = [id];
}
} else {
query = 'UPDATE `campaigns` SET `check_status`=? WHERE id=? LIMIT 1';
values = [status, id];
}
connection.query(query, values, (err, result) => {
connection.release();
if (err) {
log.error('Feed', err);
return callback(err);
}
return callback(null, result.affectedRows);
});
});
}
function checkEntries(parent, entries, callback) {
let pos = 0;
let added = 0;