first actually working version of RSS campaigns
This commit is contained in:
parent
fd0e75da27
commit
c031ea7747
7 changed files with 187 additions and 22 deletions
3
index.js
3
index.js
|
@ -13,6 +13,7 @@ let importer = require('./services/importer');
|
|||
let verpServer = require('./services/verp-server');
|
||||
let testServer = require('./services/test-server');
|
||||
let tzupdate = require('./services/tzupdate');
|
||||
let feedcheck = require('./services/feedcheck');
|
||||
let dbcheck = require('./lib/dbcheck');
|
||||
|
||||
let port = config.www.port;
|
||||
|
@ -76,6 +77,7 @@ server.on('listening', () => {
|
|||
tzupdate(() => {
|
||||
importer(() => {
|
||||
sender(() => {
|
||||
feedcheck(() => {
|
||||
log.info('Service', 'All services started');
|
||||
if (config.group) {
|
||||
try {
|
||||
|
@ -98,4 +100,5 @@ server.on('listening', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -199,11 +199,13 @@ module.exports.get = (id, withSegment, callback) => {
|
|||
});
|
||||
};
|
||||
|
||||
module.exports.create = (campaign, callback) => {
|
||||
module.exports.create = (campaign, opts, callback) => {
|
||||
|
||||
campaign = tools.convertKeys(campaign);
|
||||
let name = (campaign.name || '').toString().trim();
|
||||
|
||||
opts = opts || {};
|
||||
|
||||
if (/^\d+:\d+$/.test(campaign.list)) {
|
||||
campaign.segment = Number(campaign.list.split(':').pop());
|
||||
campaign.list = Number(campaign.list.split(':').shift());
|
||||
|
@ -216,6 +218,13 @@ module.exports.create = (campaign, callback) => {
|
|||
case 'rss':
|
||||
campaign.type = 2;
|
||||
break;
|
||||
case 'entry':
|
||||
if (opts.parent) {
|
||||
campaign.type = 3;
|
||||
} else {
|
||||
campaign.type = 1;
|
||||
}
|
||||
break;
|
||||
case 'normal':
|
||||
default:
|
||||
campaign.type = 1;
|
||||
|
@ -227,8 +236,8 @@ module.exports.create = (campaign, callback) => {
|
|||
return callback(new Error('Campaign Name must be set'));
|
||||
}
|
||||
|
||||
if (campaign.type === 2 && !campaign.sourceUrl || !isUrl(campaign.sourceUrl)) {
|
||||
return callback(new Error('RSS URL must be set'));
|
||||
if (campaign.type === 2 && (!campaign.sourceUrl || !isUrl(campaign.sourceUrl))) {
|
||||
return callback(new Error('RSS URL must be set and needs to be a valid URL'));
|
||||
}
|
||||
|
||||
lists.get(campaign.list, (err, list) => {
|
||||
|
@ -247,6 +256,11 @@ module.exports.create = (campaign, callback) => {
|
|||
values.push(5); // inactive
|
||||
}
|
||||
|
||||
if (campaign.type === 3) {
|
||||
keys.push('status', 'parent');
|
||||
values.push(2, opts.parent);
|
||||
}
|
||||
|
||||
let create = next => {
|
||||
Object.keys(campaign).forEach(key => {
|
||||
let value = typeof campaign[key] === 'number' ? campaign[key] : (campaign[key] || '').toString().trim();
|
||||
|
|
|
@ -98,7 +98,7 @@ router.get('/create', passport.csrfProtection, (req, res) => {
|
|||
});
|
||||
|
||||
router.post('/create', passport.parseForm, passport.csrfProtection, (req, res) => {
|
||||
campaigns.create(req.body, (err, id) => {
|
||||
campaigns.create(req.body, false, (err, id) => {
|
||||
if (err || !id) {
|
||||
req.flash('danger', err && err.message || err || 'Could not create campaign');
|
||||
return res.redirect('/campaigns/create?' + tools.queryParams(req.body));
|
||||
|
@ -263,7 +263,7 @@ router.get('/view/:id', passport.csrfProtection, (req, res) => {
|
|||
campaign.isInactive = campaign.status === 5;
|
||||
campaign.isActive = campaign.status === 6;
|
||||
|
||||
campaign.isNormal = campaign.type === 1;
|
||||
campaign.isNormal = campaign.type === 1 || campaign.type === 3;
|
||||
campaign.isRss = campaign.type === 2;
|
||||
|
||||
campaign.isScheduled = campaign.scheduled && campaign.scheduled > new Date();
|
||||
|
|
143
services/feedcheck.js
Normal file
143
services/feedcheck.js
Normal file
|
@ -0,0 +1,143 @@
|
|||
'use strict';
|
||||
|
||||
let log = require('npmlog');
|
||||
|
||||
let db = require('../lib/db');
|
||||
let tools = require('../lib/tools');
|
||||
let feed = require('../lib/feed');
|
||||
let campaigns = require('../lib/models/campaigns');
|
||||
|
||||
function feedLoop() {
|
||||
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
log.error('Feed', err.stack);
|
||||
return setTimeout(feedLoop, 15 * 1000);
|
||||
}
|
||||
|
||||
let query = 'SELECT `id`, `source_url`, `from`, `address`, `subject`, `list`, `segment` FROM `campaigns` WHERE `type`=2 AND `status`=6 AND (`last_check` IS NULL OR `last_check`< NOW() - INTERVAL 10 MINUTE) LIMIT 1';
|
||||
|
||||
connection.query(query, (err, rows) => {
|
||||
if (err) {
|
||||
connection.release();
|
||||
log.error('Feed', err);
|
||||
return setTimeout(feedLoop, 15 * 1000);
|
||||
}
|
||||
|
||||
if (!rows || !rows.length) {
|
||||
connection.release();
|
||||
return setTimeout(feedLoop, 15 * 1000);
|
||||
}
|
||||
|
||||
let parent = tools.convertKeys(rows[0]);
|
||||
|
||||
let query = 'UPDATE `campaigns` SET `last_check`=NOW() WHERE id=? LIMIT 1';
|
||||
connection.query(query, [parent.id], err => {
|
||||
connection.release();
|
||||
if (err) {
|
||||
log.error('Feed', err);
|
||||
return setTimeout(feedLoop, 15 * 1000);
|
||||
}
|
||||
|
||||
log.verbose('Feed', 'Checking feed %s (%s)', parent.sourceUrl, parent.id);
|
||||
feed.fetch(parent.sourceUrl, (err, entries) => {
|
||||
if (err) {
|
||||
log.error('Feed', err);
|
||||
return setTimeout(feedLoop, 1 * 1000);
|
||||
}
|
||||
checkEntries(parent, entries, (err, result) => {
|
||||
if (err) {
|
||||
log.error('Feed', err);
|
||||
}
|
||||
if (result) {
|
||||
log.verbose('Feed', 'Added %s new campaigns for %s', result, parent.id);
|
||||
}
|
||||
return setTimeout(feedLoop, 1 * 1000);
|
||||
});
|
||||
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function checkEntries(parent, entries, callback) {
|
||||
let pos = 0;
|
||||
let added = 0;
|
||||
let checkNextEntry = () => {
|
||||
if (pos >= entries.length) {
|
||||
return callback(null, added);
|
||||
}
|
||||
|
||||
let entry = entries[pos++];
|
||||
if (!entry || !entry.guid) {
|
||||
return checkNextEntry();
|
||||
}
|
||||
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
log.error('Feed', err.stack);
|
||||
return setTimeout(checkNextEntry, 15 * 1000);
|
||||
}
|
||||
|
||||
// parent+guid is unique, so the query should fail for existing entries
|
||||
let query = 'INSERT IGNORE INTO `rss` (`parent`, `guid`, `pubdate`) VALUES (?,?,?)';
|
||||
|
||||
connection.query(query, [parent.id, entry.guid, entry.date], (err, result) => {
|
||||
connection.release();
|
||||
if (err) {
|
||||
log.error('Feed', err);
|
||||
return setTimeout(checkNextEntry, 15 * 1000);
|
||||
}
|
||||
if (!result.insertId) {
|
||||
return setImmediate(checkNextEntry);
|
||||
}
|
||||
|
||||
let entryId = result.insertId;
|
||||
|
||||
let campaign = {
|
||||
type: 'entry',
|
||||
name: entry.title || 'RSS entry ' + (entry.guid.substr(0, 67)),
|
||||
from: parent.from,
|
||||
address: parent.address,
|
||||
subject: entry.title || parent.subject,
|
||||
list: parent.list,
|
||||
segment: parent.segment,
|
||||
html: entry.content
|
||||
};
|
||||
|
||||
campaigns.create(campaign, {
|
||||
parent: parent.id
|
||||
}, (err, campaignId) => {
|
||||
if (err) {
|
||||
log.error('Campaign', err);
|
||||
return setTimeout(checkNextEntry, 15 * 1000);
|
||||
}
|
||||
added++;
|
||||
db.getConnection((err, connection) => {
|
||||
if (err) {
|
||||
log.error('Feed', err.stack);
|
||||
return setTimeout(checkNextEntry, 15 * 1000);
|
||||
}
|
||||
let query = 'UPDATE `rss` SET `campaign`=? WHERE id=? LIMIT 1';
|
||||
connection.query(query, [campaignId, entryId], err => {
|
||||
connection.release();
|
||||
if (err) {
|
||||
log.error('Feed', err.stack);
|
||||
return setTimeout(checkNextEntry, 15 * 1000);
|
||||
}
|
||||
return setImmediate(checkNextEntry);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
checkNextEntry();
|
||||
}
|
||||
|
||||
module.exports = callback => {
|
||||
feedLoop();
|
||||
setImmediate(callback);
|
||||
};
|
|
@ -23,8 +23,8 @@ function findUnsent(callback) {
|
|||
}
|
||||
|
||||
// Find "normal" campaigns. Ignore RSS and drip campaigns at this point
|
||||
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type`=? LIMIT 1';
|
||||
connection.query(query, [2, 1], (err, rows) => {
|
||||
let query = 'SELECT `id`, `list`, `segment` FROM `campaigns` WHERE `status`=? AND (`scheduled` IS NULL OR `scheduled` <= NOW()) AND `type` IN (?, ?) LIMIT 1';
|
||||
connection.query(query, [2, 1, 3], (err, rows) => {
|
||||
if (err) {
|
||||
connection.release();
|
||||
return callback(err);
|
||||
|
|
|
@ -17,6 +17,8 @@ CREATE TABLE `rss` (
|
|||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
||||
ALTER TABLE `campaigns` ADD COLUMN `parent` int(11) unsigned DEFAULT NULL AFTER `type`;
|
||||
CREATE INDEX parent_index ON `campaigns` (`parent`);
|
||||
ALTER TABLE `campaigns` ADD COLUMN `last_check` timestamp NULL DEFAULT NULL AFTER `source_url`;
|
||||
CREATE INDEX check_index ON `campaigns` (`last_check`);
|
||||
|
||||
# Footer section
|
||||
LOCK TABLES `settings` WRITE;
|
||||
|
|
|
@ -226,6 +226,9 @@
|
|||
</div>
|
||||
|
||||
<div class="table-responsive">
|
||||
<div class="well text-info">
|
||||
If a new entry is found from campaign feed a new subcampaign is created of that entry and it will be listed here
|
||||
</div>
|
||||
<table data-topic-url="/campaigns" data-sort-column="4" data-sort-order="desc" class="table table-bordered table-hover data-table-ajax display nowrap" data-topic-args="parent={{id}}" width="100%" data-row-sort="0,1,0,1,1,0">
|
||||
<thead>
|
||||
<th class="col-md-1">
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue