{isEdit &&
@@ -201,10 +357,17 @@ export default class CUD extends Component {
}
- {settings}
+ {settingsEdit}
+
+ {mappingEdit &&
+
+ }
+
-
diff --git a/client/src/lists/imports/List.js b/client/src/lists/imports/List.js
index 937e4c7c..240efba6 100644
--- a/client/src/lists/imports/List.js
+++ b/client/src/lists/imports/List.js
@@ -1,15 +1,22 @@
'use strict';
-import React, { Component } from 'react';
+import React, {Component} from 'react';
import PropTypes from 'prop-types';
-import { translate } from 'react-i18next';
-import {requiresAuthenticatedUser, withPageHelpers, Title, Toolbar, NavButton} from '../../lib/page';
-import { withErrorHandling } from '../../lib/error-handling';
-import { Table } from '../../lib/table';
-import { getImportTypes } from './helpers';
+import {translate} from 'react-i18next';
+import {
+ NavButton,
+ requiresAuthenticatedUser,
+ Title,
+ Toolbar,
+ withPageHelpers
+} from '../../lib/page';
+import {withErrorHandling} from '../../lib/error-handling';
+import {Table} from '../../lib/table';
+import {getImportTypes} from './helpers';
import {Icon} from "../../lib/bootstrap-components";
import mailtrainConfig from 'mailtrainConfig';
import moment from "moment";
+import {inProgress} from '../../../../shared/imports';
@translate()
@withPageHelpers
@@ -39,12 +46,19 @@ export default class List extends Component {
const columns = [
{ data: 1, title: t('Name') },
{ data: 2, title: t('Description') },
- { data: 3, title: t('Source'), render: data => this.importTypeLabels[data].label, sortable: false, searchable: false },
- { data: 4, title: t('Status'), render: data => this.importStatusLabels[data].label, sortable: false, searchable: false },
+ { data: 3, title: t('Source'), render: data => this.importTypeLabels[data], sortable: false, searchable: false },
+ { data: 4, title: t('Status'), render: data => this.importStatusLabels[data], sortable: false, searchable: false },
{ data: 5, title: t('Last run'), render: data => moment(data).fromNow() },
{
actions: data => {
const actions = [];
+ const status = data[4];
+
+ let refreshTimeout;
+
+ if (inProgress(status)) {
+ refreshTimeout = 1000;
+ }
if (mailtrainConfig.globalPermissions.includes('setupAutomation') && this.props.list.permissions.includes('manageImports')) {
actions.push({
@@ -53,7 +67,12 @@ export default class List extends Component {
});
}
- return actions;
+ actions.push({
+ label: ,
+ link: `/lists/${this.props.list.id}/imports/${data[0]}/status`
+ });
+
+ return { refreshTimeout, actions };
}
}
];
diff --git a/client/src/lists/imports/RunStatus.js b/client/src/lists/imports/RunStatus.js
new file mode 100644
index 00000000..c73104b5
--- /dev/null
+++ b/client/src/lists/imports/RunStatus.js
@@ -0,0 +1,93 @@
+'use strict';
+
+import React, {Component} from 'react';
+import PropTypes from 'prop-types';
+import {translate} from 'react-i18next';
+import {
+ requiresAuthenticatedUser,
+ Title,
+ withPageHelpers
+} from '../../lib/page';
+import {AlignedRow} from '../../lib/form';
+import {
+ withAsyncErrorHandler,
+ withErrorHandling
+} from '../../lib/error-handling';
+import {getImportTypes} from './helpers';
+import axios from "../../lib/axios";
+import {getUrl} from "../../lib/urls";
+import moment from "moment";
+import {runStatusInProgress} from "../../../../shared/imports";
+
+@translate()
+@withPageHelpers
+@withErrorHandling
+@requiresAuthenticatedUser
+export default class Status extends Component {
+ constructor(props) {
+ super(props);
+
+ this.state = {
+ entity: props.entity
+ };
+
+ const {importTypeLabels, importStatusLabels, runStatusLabels} = getImportTypes(props.t);
+ this.importTypeLabels = importTypeLabels;
+ this.importStatusLabels = importStatusLabels;
+ this.runStatusLabels = runStatusLabels;
+
+ this.refreshTimeoutHandler = ::this.periodicRefreshTask;
+ this.refreshTimeoutId = 0;
+ }
+
+ static propTypes = {
+ entity: PropTypes.object,
+ imprt: PropTypes.object,
+ list: PropTypes.object
+ }
+
+ @withAsyncErrorHandler
+ async refreshEntity() {
+ const resp = await axios.get(getUrl(`rest/import-runs/${this.props.list.id}/${this.props.imprt.id}/${this.props.entity.id}`));
+ this.setState({
+ entity: resp.data
+ });
+ }
+
+ async periodicRefreshTask() {
+ if (runStatusInProgress(this.state.entity.status)) {
+ await this.refreshEntity();
+ this.refreshTimeoutId = setTimeout(this.refreshTimeoutHandler, 2000);
+ }
+ }
+
+ componentDidMount() {
+ this.periodicRefreshTask();
+ }
+
+ componentWillUnmount() {
+ clearTimeout(this.refreshTimeoutId);
+ }
+
+ render() {
+ const t = this.props.t;
+ const entity = this.state.entity;
+ const imprt = this.props.imprt;
+
+ return (
+
+
{t('Import Run Status')}
+
+
{imprt.name}
+
{this.importTypeLabels[imprt.type]}
+
{moment(entity.created).fromNow()}
+ {entity.finished &&
{moment(entity.finished).fromNow()}}
+
{this.runStatusLabels[entity.status]}
+
{entity.processed}
+
{entity.new}
+
{entity.failed}
+ {entity.error &&
{entity.error}
}
+
+ );
+ }
+}
\ No newline at end of file
diff --git a/client/src/lists/imports/Status.js b/client/src/lists/imports/Status.js
new file mode 100644
index 00000000..1af6835c
--- /dev/null
+++ b/client/src/lists/imports/Status.js
@@ -0,0 +1,165 @@
+'use strict';
+
+import React, {Component} from 'react';
+import PropTypes from 'prop-types';
+import {translate} from 'react-i18next';
+import {
+ requiresAuthenticatedUser,
+ Title,
+ withPageHelpers
+} from '../../lib/page';
+import {
+ AlignedRow,
+ ButtonRow,
+ Fieldset
+} from '../../lib/form';
+import {
+ withAsyncErrorHandler,
+ withErrorHandling
+} from '../../lib/error-handling';
+import {getImportTypes} from './helpers';
+import {
+ prepFinishedAndNotInProgress,
+ runInProgress,
+ RunStatus,
+ runStatusInProgress
+} from '../../../../shared/imports';
+import {Table} from "../../lib/table";
+import {
+ Button,
+ Icon
+} from "../../lib/bootstrap-components";
+import axios from "../../lib/axios";
+import {getUrl} from "../../lib/urls";
+import moment from "moment";
+import interoperableErrors from '../../../../shared/interoperable-errors';
+
+@translate()
+@withPageHelpers
+@withErrorHandling
+@requiresAuthenticatedUser
+export default class Status extends Component {
+ constructor(props) {
+ super(props);
+
+ this.state = {
+ entity: props.entity
+ };
+
+ const {importTypeLabels, importStatusLabels, runStatusLabels} = getImportTypes(props.t);
+ this.importTypeLabels = importTypeLabels;
+ this.importStatusLabels = importStatusLabels;
+ this.runStatusLabels = runStatusLabels;
+
+ this.refreshTimeoutHandler = ::this.periodicRefreshTask;
+ this.refreshTimeoutId = 0;
+ }
+
+ static propTypes = {
+ entity: PropTypes.object,
+ list: PropTypes.object
+ }
+
+ @withAsyncErrorHandler
+ async refreshEntity() {
+ const resp = await axios.get(getUrl(`rest/imports/${this.props.list.id}/${this.props.entity.id}`));
+ this.setState({
+ entity: resp.data
+ });
+ }
+
+ async periodicRefreshTask() {
+ // The periodic task runs all the time, so that we don't have to worry about starting/stopping it as a reaction to the buttons.
+ await this.refreshEntity();
+ this.refreshTimeoutId = setTimeout(this.refreshTimeoutHandler, 2000);
+ }
+
+ componentDidMount() {
+ this.periodicRefreshTask();
+ }
+
+ componentWillUnmount() {
+ clearTimeout(this.refreshTimeoutId);
+ }
+
+ async startRunAsync() {
+ try {
+ await axios.post(getUrl(`rest/import-start/${this.props.list.id}/${this.props.entity.id}`));
+ } catch (err) {
+ if (err instanceof interoperableErrors.InvalidStateError) {
+ // Just mask the fact that it's not possible to start anything and refresh instead.
+ } else {
+ throw err;
+ }
+ }
+
+ await this.refreshEntity();
+ }
+
+ async stopRunAsync() {
+ try {
+ await axios.post(getUrl(`rest/import-stop/${this.props.list.id}/${this.props.entity.id}`));
+ } catch (err) {
+ if (err instanceof interoperableErrors.InvalidStateError) {
+ // Just mask the fact that it's not possible to stop anything and refresh instead.
+ } else {
+ throw err;
+ }
+ }
+
+ await this.refreshEntity();
+ }
+
+ render() {
+ const t = this.props.t;
+ const entity = this.state.entity;
+
+ const columns = [
+ { data: 1, title: t('Started'), render: data => moment(data).fromNow() },
+ { data: 2, title: t('Finished'), render: data => data ? moment(data).fromNow() : '' },
+ { data: 3, title: t('Status'), render: data => this.runStatusLabels[data], sortable: false, searchable: false },
+ { data: 4, title: t('Processed') },
+ { data: 5, title: t('New') },
+ { data: 6, title: t('Failed') },
+ {
+ actions: data => {
+ const actions = [];
+ const status = data[3];
+
+ let refreshTimeout;
+
+ if (runStatusInProgress(status)) {
+ refreshTimeout = 1000;
+ }
+
+ actions.push({
+ label: ,
+ link: `/lists/${this.props.list.id}/imports/${this.props.entity.id}/status/${data[0]}`
+ });
+
+ return { refreshTimeout, actions };
+ }
+ }
+ ];
+
+ return (
+
+
{t('Import Status')}
+
+
{entity.name}
+
{this.importTypeLabels[entity.type]}
+
{this.importStatusLabels[entity.status]}
+ {entity.error &&
{entity.error}
}
+
+
+ {prepFinishedAndNotInProgress(entity.status) && }
+ {runInProgress(entity.status) && }
+
+
+
+
{t('Import Runs')}
+
+
+ );
+ }
+}
\ No newline at end of file
diff --git a/client/src/lists/imports/helpers.js b/client/src/lists/imports/helpers.js
index 3aa88ef3..9ef772fd 100644
--- a/client/src/lists/imports/helpers.js
+++ b/client/src/lists/imports/helpers.js
@@ -1,7 +1,7 @@
'use strict';
import React from 'react';
-import {ImportType, ImportStatus} from '../../../../shared/imports';
+import {ImportType, ImportStatus, RunStatus} from '../../../../shared/imports';
export function getImportTypes(t) {
@@ -13,17 +13,27 @@ export function getImportTypes(t) {
const importStatusLabels = {
[ImportStatus.PREP_SCHEDULED]: t('Created'),
[ImportStatus.PREP_RUNNING]: t('Preparing'),
+ [ImportStatus.PREP_STOPPING]: t('Stopping'),
[ImportStatus.PREP_FINISHED]: t('Ready'),
[ImportStatus.PREP_FAILED]: t('Preparation failed'),
[ImportStatus.RUN_SCHEDULED]: t('Scheduled'),
[ImportStatus.RUN_RUNNING]: t('Running'),
+ [ImportStatus.RUN_STOPPING]: t('Stopping'),
[ImportStatus.RUN_FINISHED]: t('Finished'),
[ImportStatus.RUN_FAILED]: t('Failed')
};
+ const runStatusLabels = {
+ [RunStatus.SCHEDULED]: t('Starting'),
+ [RunStatus.RUNNING]: t('Running'),
+ [RunStatus.STOPPING]: t('Stopping'),
+ [RunStatus.FINISHED]: t('Finished')
+ };
+
return {
importStatusLabels,
- importTypeLabels
+ importTypeLabels,
+ runStatusLabels
};
}
diff --git a/client/src/lists/root.js b/client/src/lists/root.js
index de70acda..1d46a04c 100644
--- a/client/src/lists/root.js
+++ b/client/src/lists/root.js
@@ -14,6 +14,8 @@ import SegmentsList from './segments/List';
import SegmentsCUD from './segments/CUD';
import ImportsList from './imports/List';
import ImportsCUD from './imports/CUD';
+import ImportsStatus from './imports/Status';
+import ImportRunsStatus from './imports/RunStatus';
import Share from '../shares/Share';
import TriggersList from './TriggersList';
@@ -140,18 +142,36 @@ function getMenus(t) {
resolve: {
import: params => `rest/imports/${params.listId}/${params.importId}`,
},
- link: params => `/lists/${params.listId}/imports/${params.importId}/edit`,
+ link: params => `/lists/${params.listId}/imports/${params.importId}/status`,
navs: {
':action(edit|delete)': {
title: t('Edit'),
+ resolve: {
+ fieldsGrouped: params => `rest/fields-grouped/${params.listId}`
+ },
link: params => `/lists/${params.listId}/imports/${params.importId}/edit`,
- panelRender: props =>
+ panelRender: props =>
+ },
+ 'status': {
+ title: t('Status'),
+ link: params => `/lists/${params.listId}/imports/${params.importId}/status`,
+ panelRender: props => ,
+ children: {
+ ':importRunId([0-9]+)': {
+ title: resolved => t('Run'),
+ resolve: {
+ importRun: params => `rest/import-runs/${params.listId}/${params.importId}/${params.importRunId}`,
+ },
+ link: params => `/lists/${params.listId}/imports/${params.importId}/status/${params.importRunId}`,
+ panelRender: props =>
+ }
+ }
}
}
},
create: {
title: t('Create'),
- panelRender: props =>
+ panelRender: props =>
}
}
},
diff --git a/client/src/lists/styles.scss b/client/src/lists/styles.scss
new file mode 100644
index 00000000..108536b6
--- /dev/null
+++ b/client/src/lists/styles.scss
@@ -0,0 +1,3 @@
+.mapping {
+ margin-top: 30px;
+}
\ No newline at end of file
diff --git a/client/src/lists/subscriptions/List.js b/client/src/lists/subscriptions/List.js
index b5cfd266..0bcd51bf 100644
--- a/client/src/lists/subscriptions/List.js
+++ b/client/src/lists/subscriptions/List.js
@@ -15,6 +15,7 @@ import {
import {Icon, Button} from "../../lib/bootstrap-components";
import axios from '../../lib/axios';
import {getFieldTypes, getSubscriptionStatusLabels} from './helpers';
+import {getUrl} from "../../lib/urls";
@translate()
@withForm
diff --git a/models/import-runs.js b/models/import-runs.js
new file mode 100644
index 00000000..1c261b7d
--- /dev/null
+++ b/models/import-runs.js
@@ -0,0 +1,49 @@
+'use strict';
+
+const knex = require('../lib/knex');
+const { enforce, filterObject } = require('../lib/helpers');
+const dtHelpers = require('../lib/dt-helpers');
+const interoperableErrors = require('../shared/interoperable-errors');
+const shares = require('./shares');
+
+async function getById(context, listId, importId, id) {
+ return await knex.transaction(async tx => {
+ await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'viewImports');
+
+ const entity = await tx('import_runs')
+ .innerJoin('imports', 'import_runs.import', 'imports.id')
+ .where({'imports.list': listId, 'imports.id': importId, 'import_runs.id': id})
+ .select('import_runs.id', 'import_runs.import', 'import_runs.status', 'import_runs.new',
+ 'import_runs.failed', 'import_runs.processed', 'import_runs.error', 'import_runs.created', 'import_runs.finished')
+ .first();
+
+ if (!entity) {
+ throw new interoperableErrors.NotFoundError();
+ }
+
+ return entity;
+ });
+}
+
+async function listDTAjax(context, listId, importId, params) {
+ return await knex.transaction(async tx => {
+ await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'viewImports');
+
+ return await dtHelpers.ajaxListTx(
+ tx,
+ params,
+ builder => builder
+ .from('import_runs')
+ .innerJoin('imports', 'import_runs.import', 'imports.id')
+ .where({'imports.list': listId, 'imports.id': importId}),
+ [ 'import_runs.id', 'import_runs.created', 'import_runs.finished', 'import_runs.status', 'import_runs.processed', 'import_runs.new', 'import_runs.failed']
+ );
+ });
+}
+
+
+
+module.exports = {
+ getById,
+ listDTAjax
+};
\ No newline at end of file
diff --git a/models/imports.js b/models/imports.js
index 078ead37..03a502a7 100644
--- a/models/imports.js
+++ b/models/imports.js
@@ -6,24 +6,43 @@ const { enforce, filterObject } = require('../lib/helpers');
const dtHelpers = require('../lib/dt-helpers');
const interoperableErrors = require('../shared/interoperable-errors');
const shares = require('./shares');
-const {ImportType, ImportStatus, RunStatus} = require('../shared/imports');
+const {ImportType, ImportStatus, RunStatus, prepFinished, prepFinishedAndNotInProgress, runInProgress} = require('../shared/imports');
const fs = require('fs-extra-promise');
const path = require('path');
+const importer = require('../lib/importer');
const filesDir = path.join(__dirname, '..', 'files', 'imports');
-const allowedKeys = new Set(['name', 'description', 'type', 'settings']);
+const allowedKeysCreate = new Set(['name', 'description', 'type', 'settings', 'mapping']);
+const allowedKeysUpdate = new Set(['name', 'description', 'mapping']);
function hash(entity) {
- return hasher.hash(filterObject(entity, allowedKeys));
+ return hasher.hash(filterObject(entity, allowedKeysUpdate));
}
-async function getById(context, listId, id) {
+async function getById(context, listId, id, withSampleRow = false) {
return await knex.transaction(async tx => {
await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'viewImports');
const entity = await tx('imports').where({list: listId, id}).first();
+
+ if (!entity) {
+ throw new interoperableErrors.NotFoundError();
+ }
+
entity.settings = JSON.parse(entity.settings);
+ entity.mapping = JSON.parse(entity.mapping);
+
+ if (withSampleRow && prepFinished(entity.status)) {
+ if (entity.type === ImportType.CSV_FILE) {
+ const importTable = 'import_file__' + id;
+
+ const row = await tx(importTable).first();
+ delete row.id;
+
+ entity.sampleRow = row;
+ }
+ }
return entity;
});
@@ -49,22 +68,21 @@ async function _validateAndPreprocess(tx, listId, entity, isCreate) {
enforce(entity.type >= ImportType.MIN && entity.type <= ImportType.MAX, 'Invalid import type');
entity.settings = entity.settings || {};
+ entity.mapping = entity.mapping || {};
- if (entity.type === ImportType.CSV_FILE) {
+ if (isCreate && entity.type === ImportType.CSV_FILE) {
entity.settings.csv = entity.settings.csv || {};
- enforce(entity.settings.csv.delimiter.trim(), 'CSV delimiter must not be empty');
+ enforce(entity.settings.csv.delimiter && entity.settings.csv.delimiter.trim(), 'CSV delimiter must not be empty');
}
}
async function create(context, listId, entity, files) {
- return await knex.transaction(async tx => {
+ const res = await knex.transaction(async tx => {
shares.enforceGlobalPermission(context, 'setupAutomation');
await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'manageImports');
await _validateAndPreprocess(tx, listId, entity, true);
- // FIXME - set status
-
if (entity.type === ImportType.CSV_FILE) {
enforce(files.csvFile, 'File must be included');
const csvFile = files.csvFile[0];
@@ -81,18 +99,22 @@ async function create(context, listId, entity, files) {
}
- const filteredEntity = filterObject(entity, allowedKeys);
+ const filteredEntity = filterObject(entity, allowedKeysCreate);
filteredEntity.list = listId;
filteredEntity.settings = JSON.stringify(filteredEntity.settings);
+ filteredEntity.mapping = JSON.stringify(filteredEntity.mapping);
const ids = await tx('imports').insert(filteredEntity);
const id = ids[0];
return id;
});
+
+ importer.scheduleCheck();
+ return res;
}
-async function updateWithConsistencyCheck(context, listId, entity, files) {
+async function updateWithConsistencyCheck(context, listId, entity) {
await knex.transaction(async tx => {
shares.enforceGlobalPermission(context, 'setupAutomation');
await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'manageImports');
@@ -102,25 +124,20 @@ async function updateWithConsistencyCheck(context, listId, entity, files) {
throw new interoperableErrors.NotFoundError();
}
- existing.settings = JSON.parse(existing.settings);
+ existing.mapping = JSON.parse(existing.mapping);
const existingHash = hash(existing);
if (existingHash !== entity.originalHash) {
throw new interoperableErrors.ChangedError();
}
+ enforce(prepFinishedAndNotInProgress(existing.status), 'Cannot save updates until preparation or run is finished');
+
enforce(entity.type === existing.type, 'Import type cannot be changed');
await _validateAndPreprocess(tx, listId, entity, false);
- if (entity.type === ImportType.CSV_FILE) {
- entity.settings.csv = existing.settings.csv;
- }
-
- // FIXME - set status
- // FIXME - create CSV import table
-
- const filteredEntity = filterObject(entity, allowedKeys);
+ const filteredEntity = filterObject(entity, allowedKeysUpdate);
filteredEntity.list = listId;
- filteredEntity.settings = JSON.stringify(filteredEntity.settings);
+ filteredEntity.mapping = JSON.stringify(filteredEntity.mapping);
await tx('imports').where({list: listId, id: entity.id}).update(filteredEntity);
});
@@ -134,9 +151,15 @@ async function removeTx(tx, context, listId, id) {
throw new interoperableErrors.NotFoundError();
}
- // FIXME - remove csv import table
+ existing.settings = JSON.parse(existing.settings);
- await tx('import_failed').whereIn('run', function() {this.from('import_runs').select('id').where('import', id)});
+ const filePath = path.join(filesDir, existing.settings.csv.filename);
+ await fs.removeAsync(filePath);
+
+ const importTable = 'import_file__' + id;
+ await knex.schema.dropTableIfExists(importTable);
+
+ await tx('import_failed').whereIn('run', function() {this.from('import_runs').select('id').where('import', id)}).del();
await tx('import_runs').where('import', id).del();
await tx('imports').where({list: listId, id}).del();
}
@@ -154,6 +177,60 @@ async function removeAllByListIdTx(tx, context, listId) {
}
}
+async function start(context, listId, id) {
+ await knex.transaction(async tx => {
+ shares.enforceGlobalPermission(context, 'setupAutomation');
+ await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'manageImports');
+
+ const entity = await tx('imports').where({list: listId, id}).first();
+ if (!entity) {
+ throw new interoperableErrors.NotFoundError();
+ }
+
+ if (!prepFinishedAndNotInProgress(entity.status)) {
+ throw new interoperableErrors.InvalidStateError('Cannot start until preparation or run is finished');
+ }
+
+ await tx('imports').where({list: listId, id}).update({
+ status: ImportStatus.RUN_SCHEDULED
+ });
+
+ await tx('import_runs').insert({
+ import: id,
+ status: RunStatus.SCHEDULED,
+ mapping: entity.mapping
+ });
+ });
+
+ importer.scheduleCheck();
+}
+
+async function stop(context, listId, id) {
+ await knex.transaction(async tx => {
+ shares.enforceGlobalPermission(context, 'setupAutomation');
+ await shares.enforceEntityPermissionTx(tx, context, 'list', listId, 'manageImports');
+
+ const entity = await tx('imports').where({list: listId, id}).first();
+ if (!entity) {
+ throw new interoperableErrors.NotFoundError();
+ }
+
+ if (!runInProgress(entity.status)) {
+ throw new interoperableErrors.InvalidStateError('No import is currently running');
+ }
+
+ await tx('imports').where({list: listId, id}).update({
+ status: ImportStatus.RUN_STOPPING
+ });
+
+ await tx('import_runs').where('import', id).whereIn('status', [RunStatus.SCHEDULED, RunStatus.RUNNING]).update({
+ status: RunStatus.STOPPING
+ });
+ });
+
+ importer.scheduleCheck();
+}
+
// This is to handle circular dependency with segments.js
module.exports = {
@@ -164,5 +241,7 @@ module.exports = {
create,
updateWithConsistencyCheck,
remove,
- removeAllByListIdTx
+ removeAllByListIdTx,
+ start,
+ stop
};
\ No newline at end of file
diff --git a/package-lock.json b/package-lock.json
index a3f0afe8..508efd3d 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,6 +1,6 @@
{
"name": "mailtrain",
- "version": "1.23.2",
+ "version": "1.24.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -1655,6 +1655,11 @@
"resolved": "https://registry.npmjs.org/double-ended-queue/-/double-ended-queue-2.1.0-0.tgz",
"integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw="
},
+ "easy-stack": {
+ "version": "1.0.0",
+ "resolved": "https://registry.npmjs.org/easy-stack/-/easy-stack-1.0.0.tgz",
+ "integrity": "sha1-EskbMIWjfwuqM26UhurEv5Tj54g="
+ },
"ecc-jsbn": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/ecc-jsbn/-/ecc-jsbn-0.1.1.tgz",
@@ -1969,6 +1974,11 @@
"resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz",
"integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc="
},
+ "event-pubsub": {
+ "version": "4.3.0",
+ "resolved": "https://registry.npmjs.org/event-pubsub/-/event-pubsub-4.3.0.tgz",
+ "integrity": "sha512-z7IyloorXvKbFx9Bpie2+vMJKKx1fH1EN5yiTfp8CiLOTptSYy1g8H4yDpGlEdshL1PBiFtBHepF2cNsqeEeFQ=="
+ },
"eventemitter2": {
"version": "0.4.14",
"resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-0.4.14.tgz",
@@ -4074,6 +4084,19 @@
"nopt": "~3.0.1"
}
},
+ "js-message": {
+ "version": "1.0.5",
+ "resolved": "https://registry.npmjs.org/js-message/-/js-message-1.0.5.tgz",
+ "integrity": "sha1-IwDSSxrwjondCVvBpMnJz8uJLRU="
+ },
+ "js-queue": {
+ "version": "2.0.0",
+ "resolved": "https://registry.npmjs.org/js-queue/-/js-queue-2.0.0.tgz",
+ "integrity": "sha1-NiITz4YPRo8BJfxslqvBdCUx+Ug=",
+ "requires": {
+ "easy-stack": "^1.0.0"
+ }
+ },
"js-tokens": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-3.0.2.tgz",
@@ -5624,6 +5647,16 @@
"lodash.get": "^4.4.2"
}
},
+ "node-ipc": {
+ "version": "9.1.1",
+ "resolved": "https://registry.npmjs.org/node-ipc/-/node-ipc-9.1.1.tgz",
+ "integrity": "sha512-FAyICv0sIRJxVp3GW5fzgaf9jwwRQxAKDJlmNFUL5hOy+W4X/I5AypyHoq0DXXbo9o/gt79gj++4cMr4jVWE/w==",
+ "requires": {
+ "event-pubsub": "4.3.0",
+ "js-message": "1.0.5",
+ "js-queue": "2.0.0"
+ }
+ },
"node-localstorage": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/node-localstorage/-/node-localstorage-1.3.0.tgz",
diff --git a/package.json b/package.json
index ed7a621a..2a2bd3de 100644
--- a/package.json
+++ b/package.json
@@ -99,6 +99,7 @@
"multiparty": "^4.1.3",
"mysql2": "^1.3.5",
"node-gettext": "^2.0.0-rc.1",
+ "node-ipc": "^9.1.1",
"node-mocks-http": "^1.6.5",
"node-object-hash": "^1.2.0",
"nodeify": "^1.0.1",
diff --git a/routes/rest/import-runs.js b/routes/rest/import-runs.js
new file mode 100644
index 00000000..663bdace
--- /dev/null
+++ b/routes/rest/import-runs.js
@@ -0,0 +1,17 @@
+'use strict';
+
+const passport = require('../../lib/passport');
+const importRuns = require('../../models/import-runs');
+
+const router = require('../../lib/router-async').create();
+
+router.postAsync('/import-runs-table/:listId/:importId', passport.loggedIn, async (req, res) => {
+ return res.json(await importRuns.listDTAjax(req.context, req.params.listId, req.params.importId, req.body));
+});
+
+router.getAsync('/import-runs/:listId/:importId/:runId', passport.loggedIn, async (req, res) => {
+ const entity = await importRuns.getById(req.context, req.params.listId, req.params.importId, req.params.runId);
+ return res.json(entity);
+});
+
+module.exports = router;
\ No newline at end of file
diff --git a/routes/rest/imports.js b/routes/rest/imports.js
index 5331505b..c7490c67 100644
--- a/routes/rest/imports.js
+++ b/routes/rest/imports.js
@@ -19,7 +19,7 @@ router.postAsync('/imports-table/:listId', passport.loggedIn, async (req, res) =
});
router.getAsync('/imports/:listId/:importId', passport.loggedIn, async (req, res) => {
- const entity = await imports.getById(req.context, req.params.listId, req.params.importId);
+ const entity = await imports.getById(req.context, req.params.listId, req.params.importId, true);
entity.hash = imports.hash(entity);
return res.json(entity);
});
@@ -47,5 +47,12 @@ router.deleteAsync('/imports/:listId/:importId', passport.loggedIn, passport.csr
return res.json();
});
+router.postAsync('/import-start/:listId/:importId', passport.loggedIn, passport.csrfProtection, async (req, res) => {
+ return res.json(await imports.start(req.context, req.params.listId, req.params.importId));
+});
+
+router.postAsync('/import-stop/:listId/:importId', passport.loggedIn, passport.csrfProtection, async (req, res) => {
+ return res.json(await imports.stop(req.context, req.params.listId, req.params.importId));
+});
module.exports = router;
\ No newline at end of file
diff --git a/services/importer.js b/services/importer.js
index 16967860..0c8c0835 100644
--- a/services/importer.js
+++ b/services/importer.js
@@ -6,35 +6,39 @@ const log = require('npmlog');
const fsExtra = require('fs-extra-promise');
const {ImportType, ImportStatus, RunStatus} = require('../shared/imports');
const imports = require('../models/imports');
+const { Writable } = require('stream');
const csvparse = require('csv-parse');
const fs = require('fs');
let running = false;
+const maxInsertBatchSize = 100;
function prepareCsv(impt) {
- async function finishWithError(msg, err) {
- if (finished) {
- return;
- }
+ // Processing of CSV intake
+ const filePath = path.join(imports.filesDir, impt.settings.csv.filename);
+ const importTable = 'import_file__' + impt.id;
- finished = true;
+ let finishedWithError = false;
+ let firstRow;
+
+ const finishWithError = async (msg, err) => {
+ finishedWithError = true;
log.error('Importer (CSV)', err.stack);
await knex('imports').where('id', impt.id).update({
status: ImportStatus.PREP_FAILED,
- error: msg + '\n' + err.stack
+ error: msg + '\n' + err.message
});
await fsExtra.removeAsync(filePath);
- }
+ };
- async function finishWithSuccess() {
- if (finished) {
+ const finishWithSuccess = async () => {
+ if (finishedWithError) {
return;
}
- finished = true;
log.info('Importer (CSV)', 'Preparation finished');
await knex('imports').where('id', impt.id).update({
@@ -43,59 +47,87 @@ function prepareCsv(impt) {
});
await fsExtra.removeAsync(filePath);
- }
+ };
- // Processing of CSV intake
- const filePath = path.join(imports.filesDir, impt.settings.csv.filename);
+ const processRows = async (chunks) => {
+ console.log('process row');
+ let insertBatch = [];
+ for (const chunkEntry of chunks) {
+ const record = chunkEntry.chunk;
+ if (!firstRow) {
+ firstRow = true;
+
+ const cols = [];
+ let colsDef = '';
+ for (let idx = 0; idx < record.length; idx++) {
+ const colName = 'column_' + idx;
+ cols.push({
+ column: colName,
+ name: record[idx]
+ });
+
+ colsDef += ' `' + colName + '` text DEFAULT NULL,\n';
+ }
+
+ impt.settings.csv.columns = cols;
+ await knex('imports').where({id: impt.id}).update({settings: JSON.stringify(impt.settings)});
+
+ await knex.schema.raw('CREATE TABLE `' + importTable + '` (\n' +
+ ' `id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n' +
+ colsDef +
+ ' PRIMARY KEY (`id`)\n' +
+ ') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;\n');
+
+ } else {
+ const dbRecord = {};
+ for (let idx = 0; idx < record.length; idx++) {
+ dbRecord['column_' + idx] = record[idx];
+ }
+
+ insertBatch.push(dbRecord);
+ }
+
+ if (insertBatch.length >= maxInsertBatchSize) {
+ await knex(importTable).insert(insertBatch);
+ insertBatch = [];
+ }
+ }
+
+ if (insertBatch.length > 0) {
+ await knex(importTable).insert(insertBatch);
+ }
+ };
+
+
+ const inputStream = fs.createReadStream(filePath);
const parser = csvparse({
comment: '#',
delimiter: impt.settings.csv.delimiter
});
- const inputStream = fs.createReadStream(filePath);
- let finished;
-
inputStream.on('error', err => finishWithError('Error reading CSV file.', err));
parser.on('error', err => finishWithError('Error parsing CSV file.', err));
- let firstRow;
- let processing = false;
- const processRows = () => {
- const record = parser.read();
- if (record === null) {
- processing = false;
- return;
- }
- processing = true;
-
- if (!firstRow) {
- firstRow = record;
- console.log(record);
- return setImmediate(processRows);
-
- }
-
- console.log(record);
- return setImmediate(processRows);
- };
-
- parser.on('readable', () => {
- if (finished || processing) {
- return;
- }
- processRows();
- });
-
- parser.on('finish', () => {
- finishWithSuccess();
+ const importProcessor = new Writable({
+ write(chunk, encoding, callback) {
+ processRows([{chunk, encoding}]).then(() => callback());
+ },
+ writev(chunks, callback) {
+ processRows(chunks).then(() => callback());
+ },
+ final(callback) {
+ finishWithSuccess().then(() => callback());
+ },
+ objectMode: true
});
+ parser.pipe(importProcessor);
inputStream.pipe(parser);
}
async function getTask() {
- await knex.transaction(async tx => {
+ return await knex.transaction(async tx => {
const impt = await tx('imports').whereIn('status', [ImportStatus.PREP_SCHEDULED, ImportStatus.RUN_SCHEDULED]).orderBy('created', 'asc').first();
if (impt) {
@@ -109,7 +141,7 @@ async function getTask() {
} else {
return null;
}
- })
+ });
}
async function run() {
@@ -132,7 +164,7 @@ process.on('message', msg => {
const type = msg.type;
if (type === 'scheduleCheck') {
- run()
+ run();
}
}
});
@@ -141,3 +173,5 @@ process.send({
type: 'importer-started'
});
+run();
+
diff --git a/setup/knex/migrations/20170506102634_v1_to_v2.js b/setup/knex/migrations/20170506102634_v1_to_v2.js
index 576d0aa5..329800c2 100644
--- a/setup/knex/migrations/20170506102634_v1_to_v2.js
+++ b/setup/knex/migrations/20170506102634_v1_to_v2.js
@@ -1028,6 +1028,7 @@ async function migrateImporter(knex) {
table.integer('type').unsigned().notNullable();
table.integer('status').unsigned().notNullable();
table.text('settings', 'longtext');
+ table.text('mapping', 'longtext');
table.timestamp('last_run');
table.text('error');
table.timestamp('created').defaultTo(knex.fn.now());
@@ -1037,6 +1038,7 @@ async function migrateImporter(knex) {
table.increments('id').primary();
table.integer('import').unsigned().references('imports.id');
table.integer('status').unsigned().notNullable();
+ table.text('mapping', 'longtext');
table.integer('new').defaultTo(0);
table.integer('failed').defaultTo(0);
table.integer('processed').defaultTo(0);
diff --git a/shared/imports.js b/shared/imports.js
index 0f8d6674..09b1341a 100644
--- a/shared/imports.js
+++ b/shared/imports.js
@@ -12,22 +12,60 @@ const ImportType = {
const ImportStatus = {
PREP_SCHEDULED: 0,
PREP_RUNNING: 1,
- PREP_FINISHED: 2,
- PREP_FAILED: 3,
+ PREP_STOPPING: 2,
+ PREP_FINISHED: 3,
+ PREP_FAILED: 4,
- RUN_SCHEDULED: 4,
- RUN_RUNNING: 5,
- RUN_FINISHED: 6,
- RUN_FAILED: 7
+ RUN_SCHEDULED: 5,
+ RUN_RUNNING: 6,
+ RUN_STOPPING: 7,
+ RUN_FINISHED: 8,
+ RUN_FAILED: 9
};
const RunStatus = {
- RUNNING: 0,
- FINISHED: 1
+ SCHEDULED: 0,
+ RUNNING: 1,
+ STOPPING: 2,
+ FINISHED: 3
};
+function prepInProgress(status) {
+ return status === ImportStatus.PREP_SCHEDULED || status === ImportStatus.PREP_RUNNING || status === ImportStatus.PREP_STOPPING;
+}
+
+function runInProgress(status) {
+ return status === ImportStatus.RUN_SCHEDULED || status === ImportStatus.RUN_RUNNING || status === ImportStatus.RUN_STOPPING;
+}
+
+function inProgress(status) {
+ return status === ImportStatus.PREP_SCHEDULED || status === ImportStatus.PREP_RUNNING || status === ImportStatus.PREP_STOPPING ||
+ status === ImportStatus.RUN_SCHEDULED || status === ImportStatus.RUN_RUNNING || status === ImportStatus.RUN_STOPPING;
+}
+
+function prepFinished(status) {
+ return status === ImportStatus.PREP_FINISHED ||
+ status === ImportStatus.RUN_SCHEDULED || status === ImportStatus.RUN_RUNNING || status === ImportStatus.RUN_STOPPING ||
+ status === ImportStatus.RUN_FINISHED || status === ImportStatus.RUN_FAILED;
+}
+
+function prepFinishedAndNotInProgress(status) {
+ return status === ImportStatus.PREP_FINISHED ||
+ status === ImportStatus.RUN_FINISHED || status === ImportStatus.RUN_FAILED;
+}
+
+function runStatusInProgress(status) {
+ return status === RunStatus.SCHEDULED || status === RunStatus.RUNNING || status === RunStatus.STOPPING;
+}
+
module.exports = {
ImportType,
ImportStatus,
- RunStatus
+ RunStatus,
+ prepInProgress,
+ runInProgress,
+ prepFinished,
+ prepFinishedAndNotInProgress,
+ inProgress,
+ runStatusInProgress
};
\ No newline at end of file
diff --git a/shared/interoperable-errors.js b/shared/interoperable-errors.js
index ad5a6a80..62177405 100644
--- a/shared/interoperable-errors.js
+++ b/shared/interoperable-errors.js
@@ -112,6 +112,12 @@ class DependencyPresentError extends InteroperableError {
}
}
+class InvalidStateError extends InteroperableError {
+ constructor(msg, data) {
+ super('InvalidStateError', msg, data);
+ }
+}
+
const errorTypes = {
InteroperableError,
@@ -131,7 +137,8 @@ const errorTypes = {
InvalidConfirmationForSubscriptionError,
InvalidConfirmationForAddressChangeError,
InvalidConfirmationForUnsubscriptionError,
- DependencyPresentError
+ DependencyPresentError,
+ InvalidStateError
};
function deserialize(errorObj) {