Updated packages to remove vulnerabilities reported by npm

Implementation of feedcheck - not tested though
This commit is contained in:
Tomas Bures 2018-09-02 14:59:02 +02:00
parent d74806dde3
commit 130c953d94
21 changed files with 4945 additions and 2142 deletions

View file

@ -12,7 +12,6 @@ const favicon = require('serve-favicon');
const logger = require('morgan');
const cookieParser = require('cookie-parser');
const session = require('express-session');
const RedisStore = require('connect-redis')(session);
const flash = require('connect-flash');
const hbs = require('hbs');
const handlebarsHelpers = require('./lib/handlebars-helpers');
@ -152,7 +151,6 @@ function createApp(trusted) {
app.use(cookieParser());
app.use(session({
store: config.redis.enabled ? new RedisStore(config.redis) : false,
secret: config.www.secret,
saveUninitialized: false,
resave: false

View file

@ -79,15 +79,6 @@ mysql:
# The timezone configured on the MySQL server. This can be 'local', 'Z', or an offset in the form +HH:MM or -HH:MM
timezone: local
redis:
# enable to use Redis session cache or disable if Redis is not installed
enabled: false
host: localhost
port: 6379
db: 5
# Uncomment if your Redis installation requires a password
#password:
verp:
# Enable to start an MX server that detects bounced messages using VERP
# In most cases you do not want to use it

View file

@ -6,13 +6,13 @@ const appBuilder = require('./app-builder');
const http = require('http');
//const triggers = require('./services/triggers');
const importer = require('./lib/importer');
const feedcheck = require('./lib/feedcheck');
// const verpServer = require('./services/verp-server');
const testServer = require('./services/test-server');
//const postfixBounceServer = require('./services/postfix-bounce-server');
const tzupdate = require('./services/tzupdate');
//const feedcheck = require('./services/feedcheck');
const dbcheck = require('./lib/dbcheck');
//const senders = require('./lib/senders');
const senders = require('./lib/senders');
const reportProcessor = require('./lib/report-processor');
const executor = require('./lib/executor');
const privilegeHelpers = require('./lib/privilege-helpers');
@ -91,18 +91,18 @@ dbcheck(err => { // Check if database needs upgrading before starting the server
tzupdate.start();
importer.spawn(() => {
//triggers(() => {
//senders.spawn(() => {
//feedcheck(() => {
//postfixBounceServer(async () => {
(async () => {
await reportProcessor.init();
log.info('Service', 'All services started');
})();
//});
feedcheck.spawn(() => {
senders.spawn(() => {
//triggers(() => {
//postfixBounceServer(async () => {
(async () => {
await reportProcessor.init();
log.info('Service', 'All services started');
})();
//});
//});
//});
//});
});
});
});
});
});

View file

@ -15,7 +15,7 @@ module.exports = {
};
function spawn(callback) {
log.info('Executor', 'Spawning executor process.');
log.info('Executor', 'Spawning executor process');
executorProcess = fork(path.join(__dirname, '..', 'services', 'executor.js'), [], {
cwd: path.join(__dirname, '..'),
@ -54,7 +54,7 @@ function spawn(callback) {
});
executorProcess.on('close', (code, signal) => {
log.info('Executor', 'Executor process exited with code %s signal %s.', code, signal);
log.info('Executor', 'Executor process exited with code %s signal %s', code, signal);
});
}

View file

@ -1,68 +0,0 @@
'use strict';
let FeedParser = require('feedparser');
let request = require('request');
let _ = require('./translate')._;
let util = require('util');
module.exports.fetch = (url, callback) => {
let req = request(url);
let feedparser = new FeedParser();
let returned = false;
let entries = [];
req.setHeader('user-agent', 'Mailtrain');
req.setHeader('accept', 'text/html,application/xhtml+xml');
req.on('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
req.on('response', res => {
if (returned) {
return;
}
if (res.statusCode !== 200) {
return req.emit('error', new Error(util.format(_('Bad status code %s'), res.statusCode)));
}
req.pipe(feedparser);
});
feedparser.on('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
feedparser.on('readable', () => {
let item;
while ((item = feedparser.read())) {
let entry = {
title: item.title,
date: item.date || item.pubdate || item.pubDate || new Date(),
guid: item.guid || item.link,
link: item.link,
content: item.description || item.summary,
summary: item.summary || item.description,
image_url: item.image.url
};
entries.push(entry);
}
});
feedparser.on('end', () => {
if (returned) {
return;
}
returned = true;
callback(null, entries);
});
};

33
lib/feedcheck.js Normal file
View file

@ -0,0 +1,33 @@
'use strict';
const fork = require('child_process').fork;
const log = require('npmlog');
const path = require('path');
let feedcheckProcess;
module.exports = {
spawn
};
function spawn(callback) {
log.info('Feed', 'Spawning feedcheck process');
feedcheckProcess = fork(path.join(__dirname, '..', 'services', 'feedcheck.js'), [], {
cwd: path.join(__dirname, '..'),
env: {NODE_ENV: process.env.NODE_ENV}
});
feedcheckProcess.on('message', msg => {
if (msg) {
if (msg.type === 'feedcheck-started') {
log.info('Feed', 'Feedcheck process started');
return callback();
}
}
});
feedcheckProcess.on('close', (code, signal) => {
log.error('Feed', 'Feedcheck process exited with code %s signal %s', code, signal);
});
}

View file

@ -15,7 +15,7 @@ module.exports = {
};
function spawn(callback) {
log.info('Importer', 'Spawning importer process.');
log.info('Importer', 'Spawning importer process');
knex.transaction(async tx => {
await tx('imports').where('status', ImportStatus.PREP_RUNNING).update({status: ImportStatus.PREP_SCHEDULED});
@ -36,14 +36,14 @@ function spawn(callback) {
importerProcess.on('message', msg => {
if (msg) {
if (msg.type === 'importer-started') {
log.info('Importer', 'Importer process started.');
log.info('Importer', 'Importer process started');
return callback();
}
}
});
importerProcess.on('close', (code, signal) => {
log.info('Importer', 'Importer process exited with code %s signal %s.', code, signal);
log.error('Importer', 'Importer process exited with code %s signal %s', code, signal);
});
});
}

View file

@ -48,8 +48,8 @@ async function getOrCreateMailer(sendConfigurationId) {
return transport.mailer;
}
function invalidateMailer(sendConfiguration) {
transports.delete(sendConfiguration.id);
function invalidateMailer(sendConfigurationId) {
transports.delete(sendConfigurationId);
}
@ -230,7 +230,5 @@ async function _createTransport(sendConfiguration) {
return transport;
}
module.exports = {
getOrCreateMailer,
invalidateMailer
};
module.exports.getOrCreateMailer = getOrCreateMailer;
module.exports.invalidateMailer = invalidateMailer;

58
lib/senders.js Normal file
View file

@ -0,0 +1,58 @@
'use strict';
const fork = require('child_process').fork;
const log = require('npmlog');
const path = require('path');
let messageTid = 0;
let senderProcess;
function spawn(callback) {
log.info('Senders', 'Spawning master sender process');
senderProcess = fork(path.join(__dirname, '..', 'services', 'sender-master.js'), [], {
cwd: path.join(__dirname, '..'),
env: {NODE_ENV: process.env.NODE_ENV}
});
senderProcess.on('message', msg => {
if (msg) {
if (msg.type === 'master-sender-started') {
log.info('Senders', 'Master sender process started');
return callback();
}
}
});
senderProcess.on('close', (code, signal) => {
log.error('Senders', 'Master sender process exited with code %s signal %s', code, signal);
});
}
function scheduleCheck() {
senderProcess.send({
type: 'scheduleCheck',
tid: messageTid
});
messageTid++;
}
function reloadConfig(sendConfigurationId) {
senderProcess.send({
type: 'reloadConfig',
data: {
sendConfigurationId
},
tid: messageTid
});
messageTid++;
}
module.exports = {
spawn,
scheduleCheck,
reloadConfig
};

View file

@ -24,7 +24,8 @@ const allowedKeysUpdate = new Set([...allowedKeysCommon]);
const Content = {
ALL: 0,
WITHOUT_SOURCE_CUSTOM: 1,
ONLY_SOURCE_CUSTOM: 2
ONLY_SOURCE_CUSTOM: 2,
RSS_ENTRY: 3
};
function hash(entity, content) {
@ -118,11 +119,13 @@ async function getById(context, id, withPermissions = true, content = Content.AL
}
async function _validateAndPreprocess(tx, context, entity, isCreate, content) {
if (content === Content.ALL || content === Content.WITHOUT_SOURCE_CUSTOM) {
if (content === Content.ALL || content === Content.WITHOUT_SOURCE_CUSTOM || content === Content.RSS_ENTRY) {
await namespaceHelpers.validateEntity(tx, entity);
if (isCreate) {
enforce(entity.type === CampaignType.REGULAR || entity.type === CampaignType.RSS || entity.type === CampaignType.TRIGGERED, 'Unknown campaign type');
enforce(entity.type === CampaignType.REGULAR || entity.type === CampaignType.RSS || entity.type === CampaignType.TRIGGERED ||
(content === Content.RSS_ENTRY && entity.type === CampaignType.RSS_ENTRY),
'Unknown campaign type');
if (entity.source === CampaignSource.TEMPLATE || entity.source === CampaignSource.CUSTOM_FROM_TEMPLATE) {
await shares.enforceEntityPermissionTx(tx, context, 'template', entity.data.sourceTemplate, 'view');
@ -172,7 +175,7 @@ function convertFileURLs(sourceCustom, fromEntityType, fromEntityId, toEntityTyp
}
}
async function create(context, entity) {
async function _createTx(tx, context, entity, content) {
return await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'namespace', entity.namespace, 'createCampaign');
@ -204,7 +207,7 @@ async function create(context, entity) {
entity.data.sourceCustom = sourceCampaign.data.sourceCustom;
}
await _validateAndPreprocess(tx, context, entity, true, Content.ALL);
await _validateAndPreprocess(tx, context, entity, true, content);
const filteredEntity = filterObject(entity, allowedKeysCreate);
filteredEntity.cid = shortid.generate();
@ -262,6 +265,16 @@ async function create(context, entity) {
});
}
async function create(context, entity) {
return await knex.transaction(async tx => {
return await _createTx(tx, context, entity, Content.ALL);
});
}
async function createRssTx(tx, context, entity) {
return await _createTx(tx, context, entity, Content.RSS_ENTRY);
}
async function updateWithConsistencyCheck(context, entity, content) {
await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'campaign', entity.id, 'edit');
@ -336,6 +349,7 @@ Object.assign(module.exports, {
getByIdTx,
getById,
create,
createRssTx,
updateWithConsistencyCheck,
remove,
enforceSendPermissionTx

View file

@ -9,6 +9,8 @@ const shares = require('./shares');
const namespaceHelpers = require('../lib/namespace-helpers');
const {MailerType, getSystemSendConfigurationId} = require('../shared/send-configurations');
const contextHelpers = require('../lib/context-helpers');
const mailers = require('../lib/mailers');
const senders = require('../lib/senders');
const allowedKeys = new Set(['name', 'description', 'from_email', 'from_email_overridable', 'from_name', 'from_name_overridable', 'reply_to', 'reply_to_overridable', 'subject', 'subject_overridable', 'x_mailer', 'verp_hostname', 'mailer_type', 'mailer_settings', 'namespace']);
@ -107,8 +109,8 @@ async function updateWithConsistencyCheck(context, entity) {
await shares.rebuildPermissionsTx(tx, { entityTypeId: 'sendConfiguration', entityId: entity.id });
});
// FIXME - recreate respective mailer, notify senders to recreate the mailer
mailers.invalidateMailer(entity.id);
senders.reloadConfig(entity.id);
}
async function remove(context, id) {
@ -119,9 +121,9 @@ async function remove(context, id) {
await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'sendConfiguration', id, 'delete');
// FIXME - delete send configuration assignment in campaigns
await tx('lists').update({send_configuration: null}).where('send_configuration', id);
// If any campaign with the send configuration exists, this fails due to sql foreign key
await tx('send_configurations').where('id', id).del();
});
}

View file

@ -14,6 +14,37 @@ let util = require('util');
const feed_timeout = 15 * 1000;
const rss_timeout = 1 * 1000;
const feedparser = require('feedparser-promised');
async function fetch(url) {
const httpOptions = {
uri: 'http://feeds.feedwrench.com/JavaScriptJabber.rss',
headers: {
'user-agent': 'Mailtrain',
'accept': 'text/html,application/xhtml+xml'
}
};
const items = await feedparser.parse(httpOptions);
const entries = [];
for (const item of items) {
const entry = {
title: item.title,
date: item.date || item.pubdate || item.pubDate || new Date(),
guid: item.guid || item.link,
link: item.link,
content: item.description || item.summary,
summary: item.summary || item.description,
image_url: item.image.url
};
entries.push(entry);
}
return entries;
}
function feedLoop() {
db.getConnection((err, connection) => {

6396
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -31,13 +31,13 @@
"babel-eslint": "^8.1.2",
"chai": "^4.1.2",
"eslint-config-nodemailer": "^1.2.0",
"grunt": "^1.0.1",
"grunt": "^1.0.3",
"grunt-cli": "^1.2.0",
"grunt-contrib-nodeunit": "^1.0.0",
"grunt-contrib-nodeunit": "^2.0.0",
"grunt-eslint": "^20.1.0",
"jsxgettext-andris": "^0.9.0-patch.1",
"lodash": "^4.17.4",
"mocha": "^3.5.3",
"lodash": "^4.17.10",
"mocha": "^5.2.0",
"phantomjs-prebuilt": "^2.1.15",
"selenium-webdriver": "^3.5.0",
"url-pattern": "^1.0.3"
@ -46,8 +46,7 @@
"posix": "^4.1.1"
},
"dependencies": {
"async": "^2.5.0",
"aws-sdk": "^2.176.0",
"aws-sdk": "^2.307.0",
"bcrypt-nodejs": "0.0.3",
"bluebird": "^3.5.0",
"body-parser": "^1.18.2",
@ -55,7 +54,6 @@
"compression": "^1.7.0",
"config": "^1.29.0",
"connect-flash": "^0.1.1",
"connect-redis": "^3.3.0",
"cookie-parser": "^1.4.3",
"cors": "^2.8.4",
"crypto": "^1.0.1",
@ -68,7 +66,7 @@
"express": "^4.15.5",
"express-session": "^1.15.5",
"faker": "^4.1.0",
"feedparser": "^2.2.7",
"feedparser-promised": "^1.5.0",
"fs-extra": "^4.0.2",
"fs-extra-promise": "^1.0.1",
"geoip-ultralight": "^0.1.5",
@ -79,24 +77,18 @@
"he": "^1.1.1",
"html-to-text": "^3.3.0",
"humanize": "0.0.9",
"is-url": "^1.2.2",
"isemail": "^2.2.1",
"jquery-file-upload-middleware": "^0.1.8",
"jsdom": "^9.12.0",
"json-stringify-date": "^0.1.4",
"juice": "^4.1.1",
"knex": "^0.13.0",
"juice": "^4.3.2",
"knex": "^0.15.2",
"libmime": "^3.1.0",
"mailparser": "^2.0.5",
"marked": "^0.3.9",
"memory-cache": "^0.2.0",
"mjml": "^4.0.5",
"mkdirp": "^0.5.1",
"mjml": "^4.1.2",
"moment": "^2.18.1",
"moment-timezone": "^0.5.13",
"morgan": "^1.8.2",
"multer": "^1.3.0",
"multiparty": "^4.1.3",
"mysql2": "^1.3.5",
"node-gettext": "^2.0.0-rc.1",
"node-ipc": "^9.1.1",
@ -106,20 +98,16 @@
"nodemailer": "^4.1.1",
"nodemailer-openpgp": "^1.1.0",
"npmlog": "^4.1.2",
"object-hash": "^1.1.8",
"openpgp": "^2.6.1",
"passport": "^0.4.0",
"passport-local": "^1.0.0",
"premailer-api": "^1.0.4",
"redfour": "^1.0.2",
"redis": "^2.8.0",
"request": "^2.82.0",
"request": "^2.88.0",
"request-promise": "^4.2.2",
"serve-favicon": "^2.4.4",
"shortid": "^2.2.8",
"slugify": "^1.2.8",
"smtp-server": "^3.1.0",
"striptags": "^3.1.0",
"toml": "^2.3.3",
"try-require": "^1.2.1"
}

View file

@ -26,25 +26,25 @@ function spawnProcess(tid, executable, args, outFile, errFile, cwd, uid, gid) {
fs.open(outFile, 'w', (err, outFd) => {
if (err) {
log.error('Executor', err);
reportFail('Cannot create standard output file.');
reportFail('Cannot create standard output file');
return;
}
fs.open(errFile, 'w', (err, errFd) => {
if (err) {
log.error('Executor', err);
reportFail('Cannot create standard error file.');
reportFail('Cannot create standard error file');
return;
}
privilegeHelpers.ensureMailtrainOwner(outFile, err => {
if (err) {
log.warn('Executor', 'Cannot change owner of output file of process tid:%s.', tid);
log.warn('Executor', 'Cannot change owner of output file of process tid:%s', tid);
}
privilegeHelpers.ensureMailtrainOwner(errFile, err => {
if (err) {
log.warn('Executor', 'Cannot change owner of error output file of process tid:%s.', tid);
log.warn('Executor', 'Cannot change owner of error output file of process tid:%s', tid);
}
const options = {
@ -60,15 +60,15 @@ function spawnProcess(tid, executable, args, outFile, errFile, cwd, uid, gid) {
try {
child = fork(executable, args, options);
} catch (err) {
log.error('Executor', 'Cannot start process with tid:%s.', tid);
reportFail('Cannot start process.');
log.error('Executor', 'Cannot start process with tid:%s', tid);
reportFail('Cannot start process');
return;
}
const pid = child.pid;
processes[tid] = child;
log.info('Executor', 'Process started with tid:%s pid:%s.', tid, pid);
log.info('Executor', 'Process started with tid:%s pid:%s', tid, pid);
process.send({
type: 'process-started',
tid
@ -77,7 +77,7 @@ function spawnProcess(tid, executable, args, outFile, errFile, cwd, uid, gid) {
child.on('close', (code, signal) => {
delete processes[tid];
log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s.', tid, pid, code, signal);
log.info('Executor', 'Process tid:%s pid:%s exited with code %s signal %s', tid, pid, code, signal);
fs.close(outFd, err => {
if (err) {

161
services/feedcheck.js Normal file
View file

@ -0,0 +1,161 @@
'use strict';
const log = require('npmlog');
const knex = require('../lib/knex');
const feedparser = require('feedparser-promised');
const { CampaignType, CampaignStatus, CampaignSource } = require('../shared/campaigns');
const util = require('util');
const campaigns = require('../models/campaigns');
const contextHelpers = require('../lib/context-helpers');
const _ = require('../lib/translate')._;
const feedCheckInterval = 10 * 60 * 1000;
const dbCheckInterval = 60 * 1000;
let running = false;
async function fetch(url) {
const httpOptions = {
uri: 'http://feeds.feedwrench.com/JavaScriptJabber.rss',
headers: {
'user-agent': 'Mailtrain',
'accept': 'text/html,application/xhtml+xml'
}
};
const items = await feedparser.parse(httpOptions);
const entries = [];
for (const item of items) {
const entry = {
title: item.title,
date: item.date || item.pubdate || item.pubDate || new Date(),
guid: item.guid || item.link,
link: item.link,
content: item.description || item.summary,
summary: item.summary || item.description,
image_url: item.image.url
};
entries.push(entry);
}
return entries;
}
async function run() {
if (running) {
return;
}
running = true;
let rssCampaign;
while (rssCampaign = await knex('campaigns')
.where('type', CampaignType.RSS)
.where('status', CampaignStatus.ACTIVE)
.where(qry => qry.whereNull('last_check').orWhere('last_check', '<', new Date(Date.now() - feedCheckInterval)))
// 'SELECT `id`, `source_url`, `from`, `address`, `subject`, `list`, `segment`, `html`, `open_tracking_disabled`, `click_tracking_disabled`
.first()) {
let checkStatus = null;
try {
rssCampaign.data = JSON.parse(rssCampaign.data);
const entries = await fetch(rssCampaign.data.feedUrl);
let added = 0;
for (const entry of entries) {
let entryId = null;
await knex.transaction(async tx => {
const existingEntry = await tx('rss').where({
parent: rssCampaign.id,
guid: entry.guid
}).first();
if (!existingEntry) {
const campaignData = {};
let source = rssCampaign.source;
if (source === CampaignSource.CUSTOM_FROM_TEMPLATE || source === CampaignSource.CUSTOM) {
source = CampaignSource.CUSTOM_FROM_CAMPAIGN;
campaignData.sourceCampaign = rssCampaign.id;
} else {
Object.assign(campaignData, rssCampaign.data);
}
campaignData.rssEntry = entry;
const campaign = {
type: CampaignType.RSS_ENTRY,
source,
name: entry.title || `RSS entry ${entry.guid.substr(0, 67)}`,
list: rssCampaign.list,
segment: rssCampaign.segment,
namespace: rssCampaign.namespace,
send_configuration: rssCampaign.send_configuration,
from_name_override: rssCampaign.from_name_override,
from_email_override: rssCampaign.from_email_override,
reply_to_override: rssCampaign.reply_to_override,
subject_override: rssCampaign.subject_override,
data: JSON.stringify(campaignData),
click_tracking_disabled: rssCampaign.click_tracking_disabled,
open_tracking_disabled: rssCampaign.open_tracking_disabled,
unsubscribe_url: rssCampaign.unsubscribe_url
};
const ids = await campaigns.createRssTx(tx, contextHelpers.getAdminContext(), campaign);
const campaignId = ids[0];
await tx('rss').insert({
parent: rssCampaign.id,
campaign: campaignId,
guid: entry.guid,
pubdate: entry.date,
});
added += 1;
}
});
}
if (added > 0) {
checkStatus = util.format(_('Found %s new campaign messages from feed'), added);
log.verbose('Feed', `Added ${added} new campaigns for ${rssCampaign.id}`);
} else {
checkStatus = _('Found nothing new from the feed');
}
rssCampaign.data.checkStatus = checkStatus;
await knex('campaigns').where('id', rssCampaign.id).update({
last_check: Date.now(),
data: JSON.stringify(rssCampaign.data)
});
} catch (err) {
log.error('Feed', err.message);
rssCampaign.data.checkStatus = err.message;
await knex('campaigns').where('id', rssCampaign.id).update({
last_check: Date.now(),
data: JSON.stringify(rssCampaign.data)
});
}
}
running = false;
setTimeout(run, dbCheckInterval);
}
process.send({
type: 'feedcheck-started'
});
run();

111
services/sender-master.js Normal file
View file

@ -0,0 +1,111 @@
'use strict';
const fork = require('child_process').fork;
const log = require('npmlog');
const path = require('path');
let messageTid = 0;
let workerProcesses = new Map();
const numOfWorkerProcesses = 5;
let running = false;
/*
const knex = require('../lib/knex');
const path = require('path');
const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');
const fields = require('../models/fields');
const subscriptions = require('../models/subscriptions');
const { Writable } = require('stream');
const { cleanupFromPost, enforce } = require('../lib/helpers');
const contextHelpers = require('../lib/context-helpers');
const tools = require('../lib/tools');
const shares = require('../models/shares');
const _ = require('../lib/translate')._;
*/
async function spawnWorker(workerId) {
return await new Promise((resolve, reject) => {
log.info('Senders', `Spawning worker process ${workerId}`);
const senderProcess = fork(path.join(__dirname, 'sender-worker.js'), [workerId], {
cwd: path.join(__dirname, '..'),
env: {NODE_ENV: process.env.NODE_ENV}
});
senderProcess.on('message', msg => {
if (msg) {
if (msg.type === 'worker-started') {
log.info('Senders', `Worker process ${workerId} started`);
return resolve();
}
}
});
senderProcess.on('close', (code, signal) => {
log.error('Senders', `Worker process ${workerId} exited with code %s signal %s`, code, signal);
});
workerProcesses.set(workerId, senderProcess);
});
}
async function run() {
if (running) {
return;
}
running = true;
// FIXME
running = false;
}
function sendToWorker(workerId, msgType, data) {
workerProcesses.get(workerId).send({
type: msgType,
data,
tid: messageTid
});
messageTid++;
}
async function init() {
const spawnWorkerFutures = [];
let workerId;
for (workerId = 0; workerId < numOfWorkerProcesses; workerId++) {
spawnWorkerFutures.push(spawnWorker(workerId));
}
await Promise.all(spawnWorkerFutures);
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'scheduleCheck') {
// FIXME
} else if (type === 'reloadConfig') {
for (const worker of workerProcesses.keys()) {
sendToWorker(workerId, 'reloadConfig', msg.data);
}
}
}
});
process.send({
type: 'sender-started'
});
run();
}
init();

61
services/sender-worker.js Normal file
View file

@ -0,0 +1,61 @@
'use strict';
const log = require('npmlog');
const mailers = require('../lib/mailers');
const workerId = Number.parseInt(process.argv[2]);
let running = false;
/*
const knex = require('../lib/knex');
const path = require('path');
const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportSource, MappingType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');
const fields = require('../models/fields');
const subscriptions = require('../models/subscriptions');
const { Writable } = require('stream');
const { cleanupFromPost, enforce } = require('../lib/helpers');
const contextHelpers = require('../lib/context-helpers');
const tools = require('../lib/tools');
const shares = require('../models/shares');
const _ = require('../lib/translate')._;
*/
async function sendMail() {
if (running) {
log.error('Senders', `Worker ${workerId} assigned work while working`);
return;
}
running = true;
// FIXME
running = false;
}
function sendToMaster(msgType) {
process.send({
type: msgType
});
}
process.on('message', msg => {
if (msg) {
const type = msg.type;
if (type === 'reloadConfig') {
mailers.invalidateMailer(msg.data.sendConfigurationId);
} else if (type === 'sendMail') {
// FIXME
}
}
});
sendToMaster('worker-started');

View file

@ -838,7 +838,7 @@ async function migrateCampaigns(knex) {
OK | id | int(10) unsigned | NO | PRI | NULL | auto_increment |
OK | cid | varchar(255) | NO | UNI | NULL | |
OK | type | tinyint(4) unsigned | NO | MUL | 1 | |
OK | parent | int(10) unsigned | YES | MUL | NULL | |
X | parent | int(10) unsigned | YES | MUL | NULL | |
OK | name | varchar(255) | NO | MUL | | |
OK | description | text | YES | | NULL | |
OK | list | int(10) unsigned | NO | | NULL | |
@ -880,9 +880,10 @@ async function migrateCampaigns(knex) {
+-------------------------+---------------------+------+-----+-------------------+----------------+
list - we will probably need some strategy how to consistently treat stats when list/segment changes
parent - used only for campaign type RSS
last_check - used only for campaign type RSS
scheduled - used only for campaign type NORMAL
parent - discarded because it duplicates the info in table `rss`. `rss` can be used to establish a db link between RSS campaign and its entries
*/
await knex.schema.table('campaigns', table => {
@ -898,7 +899,7 @@ async function migrateCampaigns(knex) {
for (const campaign of campaigns) {
const data = {};
if (campaign.type === CampaignType.REGULAR || campaign.type === CampaignType.RSS_ENTRY || campaign.type === CampaignType.REGULAR || campaign.type === CampaignType.TRIGGERED) {
if (campaign.type === CampaignType.REGULAR || campaign.type === CampaignType.RSS || campaign.type === CampaignType.RSS_ENTRY || campaign.type === CampaignType.TRIGGERED) {
if (campaign.template) {
let editorType = campaign.editor_name;
const editorData = JSON.parse(campaign.editor_data || '{}');
@ -930,12 +931,11 @@ async function migrateCampaigns(knex) {
campaign.source = CampaignSource.URL;
data.sourceUrl = campaign.source_url;
}
}
} else if (campaign.type === CampaignType.RSS) {
campaign.source = CampaignSource.RSS;
if (campaign.type === CampaignType.RSS) {
data.feedUrl = campaign.source_url;
data.checkStatus = campaign.checkStatus;
data.checkStatus = campaign.check_status;
}
campaign.data = JSON.stringify(data);

View file

@ -8,9 +8,8 @@ const CampaignSource = {
CUSTOM_FROM_TEMPLATE: 3,
CUSTOM_FROM_CAMPAIGN: 4,
URL: 5,
RSS: 6,
MAX: 6
MAX: 5
};
const CampaignType = {

View file

@ -5,20 +5,20 @@
"requires": true,
"dependencies": {
"moment": {
"version": "2.20.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.20.1.tgz",
"integrity": "sha512-Yh9y73JRljxW5QxN08Fner68eFLxM5ynNOAw2LbIB1YAGeQzZT8QFSUvkAz609Zf+IHhhaUxqZK8dG3W/+HEvg=="
"version": "2.22.2",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.22.2.tgz",
"integrity": "sha1-PCV/mDn8DpP/UxSWMiOeuQeD/2Y="
},
"moment-timezone": {
"version": "0.5.14",
"resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.14.tgz",
"integrity": "sha1-TrOP+VOLgBCLpGekWPPtQmjM/LE=",
"version": "0.5.21",
"resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.21.tgz",
"integrity": "sha512-j96bAh4otsgj3lKydm3K7kdtA3iKf2m6MY2iSYCzCm5a1zmHo1g+aK3068dDEeocLZQIS9kU8bsdQHLqEvgW0A==",
"requires": {
"moment": ">= 2.9.0"
}
},
"owasp-password-strength-test": {
"version": "github:bures/owasp-password-strength-test#50bfcf0035b1468b9d03a00eaf561d4fed4973eb",
"version": "github:bures/owasp-password-strength-test#043711c4ee78899963ef51a56e8b5e395fd1c361",
"from": "github:bures/owasp-password-strength-test"
}
}