From c031ea7747013693154669999983f95c73ad6cc3 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Tue, 3 May 2016 14:04:46 +0300 Subject: [PATCH] first actually working version of RSS campaigns --- index.js | 33 +++++---- lib/models/campaigns.js | 20 ++++- routes/campaigns.js | 4 +- services/feedcheck.js | 143 ++++++++++++++++++++++++++++++++++++ services/sender.js | 4 +- setup/sql/upgrade-00008.sql | 2 + views/campaigns/view.hbs | 3 + 7 files changed, 187 insertions(+), 22 deletions(-) create mode 100644 services/feedcheck.js diff --git a/index.js b/index.js index 60654265..78de9a72 100644 --- a/index.js +++ b/index.js @@ -13,6 +13,7 @@ let importer = require('./services/importer'); let verpServer = require('./services/verp-server'); let testServer = require('./services/test-server'); let tzupdate = require('./services/tzupdate'); +let feedcheck = require('./services/feedcheck'); let dbcheck = require('./lib/dbcheck'); let port = config.www.port; @@ -76,23 +77,25 @@ server.on('listening', () => { tzupdate(() => { importer(() => { sender(() => { - log.info('Service', 'All services started'); - if (config.group) { - try { - process.setgid(config.group); - log.info('Service', 'Changed group to "%s" (%s)', config.group, process.getgid()); - } catch (E) { - log.info('Service', 'Failed to change group to "%s" (%s)', config.group, E.message); + feedcheck(() => { + log.info('Service', 'All services started'); + if (config.group) { + try { + process.setgid(config.group); + log.info('Service', 'Changed group to "%s" (%s)', config.group, process.getgid()); + } catch (E) { + log.info('Service', 'Failed to change group to "%s" (%s)', config.group, E.message); + } } - } - if (config.user) { - try { - process.setuid(config.user); - log.info('Service', 'Changed user to "%s" (%s)', config.user, process.getuid()); - } catch (E) { - log.info('Service', 'Failed to change user to "%s" (%s)', config.user, E.message); + if (config.user) { + try { + process.setuid(config.user); + log.info('Service', 'Changed user to "%s" (%s)', config.user, process.getuid()); + } catch (E) { + log.info('Service', 'Failed to change user to "%s" (%s)', config.user, E.message); + } } - } + }); }); }); }); diff --git a/lib/models/campaigns.js b/lib/models/campaigns.js index c2f0ccd4..e67244b4 100644 --- a/lib/models/campaigns.js +++ b/lib/models/campaigns.js @@ -199,11 +199,13 @@ module.exports.get = (id, withSegment, callback) => { }); }; -module.exports.create = (campaign, callback) => { +module.exports.create = (campaign, opts, callback) => { campaign = tools.convertKeys(campaign); let name = (campaign.name || '').toString().trim(); + opts = opts || {}; + if (/^\d+:\d+$/.test(campaign.list)) { campaign.segment = Number(campaign.list.split(':').pop()); campaign.list = Number(campaign.list.split(':').shift()); @@ -216,6 +218,13 @@ module.exports.create = (campaign, callback) => { case 'rss': campaign.type = 2; break; + case 'entry': + if (opts.parent) { + campaign.type = 3; + } else { + campaign.type = 1; + } + break; case 'normal': default: campaign.type = 1; @@ -227,8 +236,8 @@ module.exports.create = (campaign, callback) => { return callback(new Error('Campaign Name must be set')); } - if (campaign.type === 2 && !campaign.sourceUrl || !isUrl(campaign.sourceUrl)) { - return callback(new Error('RSS URL must be set')); + if (campaign.type === 2 && (!campaign.sourceUrl || !isUrl(campaign.sourceUrl))) { + return callback(new Error('RSS URL must be set and needs to be a valid URL')); } lists.get(campaign.list, (err, list) => { @@ -247,6 +256,11 @@ module.exports.create = (campaign, callback) => { values.push(5); // inactive } + if (campaign.type === 3) { + keys.push('status', 'parent'); + values.push(2, opts.parent); + } + let create = next => { Object.keys(campaign).forEach(key => { let value = typeof campaign[key] === 'number' ? campaign[key] : (campaign[key] || '').toString().trim(); diff --git a/routes/campaigns.js b/routes/campaigns.js index a2d7e704..e443e9ea 100644 --- a/routes/campaigns.js +++ b/routes/campaigns.js @@ -98,7 +98,7 @@ router.get('/create', passport.csrfProtection, (req, res) => { }); router.post('/create', passport.parseForm, passport.csrfProtection, (req, res) => { - campaigns.create(req.body, (err, id) => { + campaigns.create(req.body, false, (err, id) => { if (err || !id) { req.flash('danger', err && err.message || err || 'Could not create campaign'); return res.redirect('/campaigns/create?' + tools.queryParams(req.body)); @@ -263,7 +263,7 @@ router.get('/view/:id', passport.csrfProtection, (req, res) => { campaign.isInactive = campaign.status === 5; campaign.isActive = campaign.status === 6; - campaign.isNormal = campaign.type === 1; + campaign.isNormal = campaign.type === 1 || campaign.type === 3; campaign.isRss = campaign.type === 2; campaign.isScheduled = campaign.scheduled && campaign.scheduled > new Date(); diff --git a/services/feedcheck.js b/services/feedcheck.js new file mode 100644 index 00000000..dff8e9a3 --- /dev/null +++ b/services/feedcheck.js @@ -0,0 +1,143 @@ +'use strict'; + +let log = require('npmlog'); + +let db = require('../lib/db'); +let tools = require('../lib/tools'); +let feed = require('../lib/feed'); +let campaigns = require('../lib/models/campaigns'); + +function feedLoop() { + + db.getConnection((err, connection) => { + if (err) { + log.error('Feed', err.stack); + return setTimeout(feedLoop, 15 * 1000); + } + + let query = 'SELECT `id`, `source_url`, `from`, `address`, `subject`, `list`, `segment` FROM `campaigns` WHERE `type`=2 AND `status`=6 AND (`last_check` IS NULL OR `last_check`< NOW() - INTERVAL 10 MINUTE) LIMIT 1'; + + connection.query(query, (err, rows) => { + if (err) { + connection.release(); + log.error('Feed', err); + return setTimeout(feedLoop, 15 * 1000); + } + + if (!rows || !rows.length) { + connection.release(); + return setTimeout(feedLoop, 15 * 1000); + } + + let parent = tools.convertKeys(rows[0]); + + let query = 'UPDATE `campaigns` SET `last_check`=NOW() WHERE id=? LIMIT 1'; + connection.query(query, [parent.id], err => { + connection.release(); + if (err) { + log.error('Feed', err); + return setTimeout(feedLoop, 15 * 1000); + } + + log.verbose('Feed', 'Checking feed %s (%s)', parent.sourceUrl, parent.id); + feed.fetch(parent.sourceUrl, (err, entries) => { + if (err) { + log.error('Feed', err); + return setTimeout(feedLoop, 1 * 1000); + } + checkEntries(parent, entries, (err, result) => { + if (err) { + log.error('Feed', err); + } + if (result) { + log.verbose('Feed', 'Added %s new campaigns for %s', result, parent.id); + } + return setTimeout(feedLoop, 1 * 1000); + }); + + }); + }); + }); + }); +} + +function checkEntries(parent, entries, callback) { + let pos = 0; + let added = 0; + let checkNextEntry = () => { + if (pos >= entries.length) { + return callback(null, added); + } + + let entry = entries[pos++]; + if (!entry || !entry.guid) { + return checkNextEntry(); + } + + db.getConnection((err, connection) => { + if (err) { + log.error('Feed', err.stack); + return setTimeout(checkNextEntry, 15 * 1000); + } + + // parent+guid is unique, so the query should fail for existing entries + let query = 'INSERT IGNORE INTO `rss` (`parent`, `guid`, `pubdate`) VALUES (?,?,?)'; + + connection.query(query, [parent.id, entry.guid, entry.date], (err, result) => { + connection.release(); + if (err) { + log.error('Feed', err); + return setTimeout(checkNextEntry, 15 * 1000); + } + if (!result.insertId) { + return setImmediate(checkNextEntry); + } + + let entryId = result.insertId; + + let campaign = { + type: 'entry', + name: entry.title || 'RSS entry ' + (entry.guid.substr(0, 67)), + from: parent.from, + address: parent.address, + subject: entry.title || parent.subject, + list: parent.list, + segment: parent.segment, + html: entry.content + }; + + campaigns.create(campaign, { + parent: parent.id + }, (err, campaignId) => { + if (err) { + log.error('Campaign', err); + return setTimeout(checkNextEntry, 15 * 1000); + } + added++; + db.getConnection((err, connection) => { + if (err) { + log.error('Feed', err.stack); + return setTimeout(checkNextEntry, 15 * 1000); + } + let query = 'UPDATE `rss` SET `campaign`=? WHERE id=? LIMIT 1'; + connection.query(query, [campaignId, entryId], err => { + connection.release(); + if (err) { + log.error('Feed', err.stack); + return setTimeout(checkNextEntry, 15 * 1000); + } + return setImmediate(checkNextEntry); + }); + }); + }); + }); + }); + }; + + checkNextEntry(); +} + +module.exports = callback => { + feedLoop(); + setImmediate(callback); +}; diff --git a/services/sender.js b/services/sender.js index 3ad40b87..d9be0a84 100644 --- a/services/sender.js +++ b/services/sender.js @@ -23,8 +23,8 @@ function findUnsent(callback) { } // Find "normal" campaigns. Ignore RSS and drip campaigns at this point - let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type`=? LIMIT 1'; - connection.query(query, [2, 1], (err, rows) => { + let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1'; + connection.query(query, [2, 1, 3], (err, rows) => { if (err) { connection.release(); return callback(err); diff --git a/setup/sql/upgrade-00008.sql b/setup/sql/upgrade-00008.sql index 90170ddb..89f3293f 100644 --- a/setup/sql/upgrade-00008.sql +++ b/setup/sql/upgrade-00008.sql @@ -17,6 +17,8 @@ CREATE TABLE `rss` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ALTER TABLE `campaigns` ADD COLUMN `parent` int(11) unsigned DEFAULT NULL AFTER `type`; CREATE INDEX parent_index ON `campaigns` (`parent`); +ALTER TABLE `campaigns` ADD COLUMN `last_check` timestamp NULL DEFAULT NULL AFTER `source_url`; +CREATE INDEX check_index ON `campaigns` (`last_check`); # Footer section LOCK TABLES `settings` WRITE; diff --git a/views/campaigns/view.hbs b/views/campaigns/view.hbs index b81597ca..6fa800d2 100644 --- a/views/campaigns/view.hbs +++ b/views/campaigns/view.hbs @@ -226,6 +226,9 @@
+
+ If a new entry is found from campaign feed a new subcampaign is created of that entry and it will be listed here +