2016-04-04 12:36:30 +00:00
'use strict' ;
2018-12-21 18:09:18 +00:00
const config = require ( 'config' ) ;
2018-08-06 14:54:51 +00:00
const knex = require ( '../lib/knex' ) ;
const path = require ( 'path' ) ;
2018-09-27 19:32:35 +00:00
const log = require ( '../lib/log' ) ;
2018-08-06 14:54:51 +00:00
const fsExtra = require ( 'fs-extra-promise' ) ;
2018-11-18 14:38:52 +00:00
const { ImportSource , MappingType , ImportStatus , RunStatus } = require ( '../../shared/imports' ) ;
2018-08-06 14:54:51 +00:00
const imports = require ( '../models/imports' ) ;
2018-09-01 19:29:10 +00:00
const fields = require ( '../models/fields' ) ;
const subscriptions = require ( '../models/subscriptions' ) ;
2018-08-26 09:46:12 +00:00
const { Writable } = require ( 'stream' ) ;
2018-09-01 19:29:10 +00:00
const { cleanupFromPost , enforce } = require ( '../lib/helpers' ) ;
const contextHelpers = require ( '../lib/context-helpers' ) ;
const tools = require ( '../lib/tools' ) ;
const shares = require ( '../models/shares' ) ;
2018-11-17 22:26:45 +00:00
const { tLog } = require ( '../lib/translate' ) ;
2019-02-07 14:38:32 +00:00
const { ListActivityType } = require ( '../../shared/activity-log' ) ;
const activityLog = require ( '../lib/activity-log' ) ;
2018-09-01 19:29:10 +00:00
2018-08-06 14:54:51 +00:00
const csvparse = require ( 'csv-parse' ) ;
const fs = require ( 'fs' ) ;
let running = false ;
2018-09-01 19:29:10 +00:00
const maxPrepareBatchSize = 100 ;
const maxImportBatchSize = 10 ;
2018-08-06 14:54:51 +00:00
function prepareCsv ( impt ) {
2018-08-26 09:46:12 +00:00
// Processing of CSV intake
const filePath = path . join ( imports . filesDir , impt . settings . csv . filename ) ;
const importTable = 'import_file__' + impt . id ;
let finishedWithError = false ;
let firstRow ;
2016-04-04 12:36:30 +00:00
2018-08-26 09:46:12 +00:00
const finishWithError = async ( msg , err ) => {
finishedWithError = true ;
2018-08-06 14:54:51 +00:00
log . error ( 'Importer (CSV)' , err . stack ) ;
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
await knex ( 'imports' ) . where ( 'id' , impt . id ) . update ( {
status : ImportStatus . PREP _FAILED ,
2018-08-26 09:46:12 +00:00
error : msg + '\n' + err . message
2018-08-06 14:54:51 +00:00
} ) ;
2016-04-04 12:36:30 +00:00
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . PREP _FAILED } ) ;
2018-08-06 14:54:51 +00:00
await fsExtra . removeAsync ( filePath ) ;
2018-08-26 09:46:12 +00:00
} ;
2017-02-02 14:47:50 +00:00
2018-08-26 09:46:12 +00:00
const finishWithSuccess = async ( ) => {
if ( finishedWithError ) {
2018-08-06 14:54:51 +00:00
return ;
2016-04-04 12:36:30 +00:00
}
2018-08-06 14:54:51 +00:00
log . info ( 'Importer (CSV)' , 'Preparation finished' ) ;
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
await knex ( 'imports' ) . where ( 'id' , impt . id ) . update ( {
status : ImportStatus . PREP _FINISHED ,
error : null
2016-04-04 12:36:30 +00:00
} ) ;
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . PREP _FINISHED } ) ;
2018-08-06 14:54:51 +00:00
await fsExtra . removeAsync ( filePath ) ;
2018-08-26 09:46:12 +00:00
} ;
2018-08-06 14:54:51 +00:00
2018-08-26 09:46:12 +00:00
const processRows = async ( chunks ) => {
let insertBatch = [ ] ;
for ( const chunkEntry of chunks ) {
const record = chunkEntry . chunk ;
if ( ! firstRow ) {
firstRow = true ;
const cols = [ ] ;
let colsDef = '' ;
for ( let idx = 0 ; idx < record . length ; idx ++ ) {
const colName = 'column_' + idx ;
cols . push ( {
column : colName ,
name : record [ idx ]
} ) ;
colsDef += ' `' + colName + '` text DEFAULT NULL,\n' ;
}
impt . settings . csv . columns = cols ;
2018-09-01 19:29:10 +00:00
impt . settings . sourceTable = importTable ;
2018-08-26 09:46:12 +00:00
await knex ( 'imports' ) . where ( { id : impt . id } ) . update ( { settings : JSON . stringify ( impt . settings ) } ) ;
await knex . schema . raw ( 'CREATE TABLE `' + importTable + '` (\n' +
' `id` int(10) unsigned NOT NULL AUTO_INCREMENT,\n' +
colsDef +
' PRIMARY KEY (`id`)\n' +
') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;\n' ) ;
} else {
const dbRecord = { } ;
for ( let idx = 0 ; idx < record . length ; idx ++ ) {
dbRecord [ 'column_' + idx ] = record [ idx ] ;
}
insertBatch . push ( dbRecord ) ;
}
2018-09-01 19:29:10 +00:00
if ( insertBatch . length >= maxPrepareBatchSize ) {
2018-08-26 09:46:12 +00:00
await knex ( importTable ) . insert ( insertBatch ) ;
insertBatch = [ ] ;
}
}
if ( insertBatch . length > 0 ) {
await knex ( importTable ) . insert ( insertBatch ) ;
}
} ;
2018-08-06 14:54:51 +00:00
2018-08-26 09:46:12 +00:00
const inputStream = fs . createReadStream ( filePath ) ;
2018-08-06 14:54:51 +00:00
const parser = csvparse ( {
2016-04-04 12:36:30 +00:00
comment : '#' ,
2018-08-06 14:54:51 +00:00
delimiter : impt . settings . csv . delimiter
2016-04-04 12:36:30 +00:00
} ) ;
2018-08-06 14:54:51 +00:00
inputStream . on ( 'error' , err => finishWithError ( 'Error reading CSV file.' , err ) ) ;
parser . on ( 'error' , err => finishWithError ( 'Error parsing CSV file.' , err ) ) ;
2016-04-04 12:36:30 +00:00
2018-08-26 09:46:12 +00:00
const importProcessor = new Writable ( {
write ( chunk , encoding , callback ) {
processRows ( [ { chunk , encoding } ] ) . then ( ( ) => callback ( ) ) ;
} ,
writev ( chunks , callback ) {
processRows ( chunks ) . then ( ( ) => callback ( ) ) ;
} ,
final ( callback ) {
finishWithSuccess ( ) . then ( ( ) => callback ( ) ) ;
} ,
objectMode : true
2018-08-06 14:54:51 +00:00
} ) ;
2016-04-04 12:36:30 +00:00
2018-08-26 09:46:12 +00:00
parser . pipe ( importProcessor ) ;
2018-08-06 14:54:51 +00:00
inputStream . pipe ( parser ) ;
}
2016-04-04 12:36:30 +00:00
2018-09-01 19:29:10 +00:00
async function _execImportRun ( impt , handlers ) {
try {
let imptRun ;
// It should not really happen that we have more than one run to be processed for an import. However, to be on the safe side, we process it in a while.
while ( imptRun = await knex ( 'import_runs' ) . where ( 'import' , impt . id ) . whereIn ( 'status' , [ RunStatus . SCHEDULED ] ) . orderBy ( 'created' , 'asc' ) . first ( ) ) {
try {
imptRun . mapping = JSON . parse ( imptRun . mapping ) || { } ;
log . info ( 'Importer' , ` Starting BASIC_SUBSCRIBE run ${ impt . id } . ${ imptRun . id } ` ) ;
await knex ( 'import_runs' ) . where ( 'id' , imptRun . id ) . update ( {
status : RunStatus . RUNNING
} ) ;
const importTable = impt . settings . sourceTable ;
const flds = await fields . list ( contextHelpers . getAdminContext ( ) , impt . list ) ;
let lastId = imptRun . last _id || 0 ;
let countNew = imptRun . new || 0 ;
let countProcessed = imptRun . processed || 0 ;
let countFailed = imptRun . failed || 0 ;
while ( true ) {
const rows = await knex ( importTable ) . orderBy ( 'id' , 'asc' ) . where ( 'id' , '>' , lastId ) . limit ( maxImportBatchSize ) ;
log . verbose ( 'Importer' , ` Processing run ${ impt . id } . ${ imptRun . id } with id > ${ lastId } ... ${ rows . length } entries ` ) ;
if ( rows . length === 0 ) {
break ;
}
const subscrs = [ ] ;
const unsubscrs = [ ] ;
const failures = [ ] ;
// This should help in case we do the DNS check inside process row because it does all the checks at the same time.
await Promise . all ( rows . map ( row => handlers . processSourceRow ( impt , imptRun , flds , row , subscrs , unsubscrs , failures ) ) ) ;
lastId = rows [ rows . length - 1 ] . id ;
await knex . transaction ( async tx => {
2018-11-17 01:54:23 +00:00
const groupedFieldsMap = await subscriptions . getGroupedFieldsMapTx ( tx , impt . list ) ;
2018-09-01 19:29:10 +00:00
let newRows = 0 ;
for ( const subscr of subscrs ) {
const meta = {
updateAllowed : true ,
updateOfUnsubscribedAllowed : true ,
subscribeIfNoExisting : true
} ;
try {
await subscriptions . createTxWithGroupedFieldsMap ( tx , contextHelpers . getAdminContext ( ) , impt . list , groupedFieldsMap , subscr , impt . id , meta ) ;
if ( ! meta . existing ) {
newRows += 1 ;
}
} catch ( err ) {
failures . push ( {
run : imptRun . id ,
source _id : subscr . source _id ,
email : subscr . email ,
reason : err . message
} ) ;
}
}
for ( const unsubscr of unsubscrs ) {
try {
await subscriptions . unsubscribeByEmailAndGetTx ( tx , contextHelpers . getAdminContext ( ) , impt . list , unsubscr . email ) ;
} catch ( err ) {
failures . push ( {
run : imptRun . id ,
source _id : unsubscr . source _id ,
email : unsubscr . email ,
reason : err . message
} ) ;
}
}
countProcessed += rows . length ;
countNew += newRows ;
countFailed += failures . length ;
if ( failures . length > 0 ) {
await tx ( 'import_failed' ) . insert ( failures ) ;
}
await tx ( 'import_runs' ) . where ( 'id' , imptRun . id ) . update ( {
last _id : lastId ,
new : countNew ,
failed : countFailed ,
processed : countProcessed
} ) ;
} ) ;
2018-08-26 13:32:03 +00:00
2018-09-01 19:29:10 +00:00
const imptRunStatus = await knex ( 'import_runs' ) . where ( 'id' , imptRun . id ) . select ( [ 'status' ] ) . first ( ) ;
if ( imptRunStatus . status === RunStatus . STOPPING ) {
throw new Error ( 'Aborted' ) ;
}
}
2018-08-26 13:32:03 +00:00
2018-09-01 19:29:10 +00:00
await knex ( 'import_runs' ) . where ( 'id' , imptRun . id ) . update ( {
status : RunStatus . FINISHED ,
error : null ,
finished : new Date ( )
} ) ;
log . info ( 'Importer' , ` BASIC_SUBSCRIBE run ${ impt . id } . ${ imptRun . id } finished ` ) ;
} catch ( err ) {
await knex ( 'import_runs' ) . where ( 'id' , imptRun . id ) . update ( {
status : RunStatus . FAILED ,
error : err . message ,
finished : new Date ( )
} ) ;
2018-11-17 22:26:45 +00:00
throw new Error ( 'Last run failed' ) ;
2018-09-01 19:29:10 +00:00
}
}
2018-08-26 13:32:03 +00:00
2018-09-01 19:29:10 +00:00
await knex ( 'imports' ) . where ( 'id' , impt . id ) . update ( {
last _run : new Date ( ) ,
error : null ,
status : ImportStatus . RUN _FINISHED
} ) ;
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . RUN _FINISHED } ) ;
2018-09-01 19:29:10 +00:00
} catch ( err ) {
await knex ( 'imports' ) . where ( 'id' , impt . id ) . update ( {
last _run : new Date ( ) ,
error : err . message ,
status : ImportStatus . RUN _FAILED
2018-08-26 13:32:03 +00:00
} ) ;
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . PREP _FAILED } ) ;
2018-08-26 13:32:03 +00:00
}
2018-09-01 19:29:10 +00:00
}
2018-08-26 13:32:03 +00:00
2018-09-01 19:29:10 +00:00
async function basicSubscribe ( impt ) {
const handlers = {
processSourceRow : async ( impt , imptRun , flds , row , subscriptions , unsubscriptions , failures ) => {
const mappingFields = imptRun . mapping . fields || { } ;
const mappingSettings = imptRun . mapping . settings || { } ;
const convRow = { } ;
for ( const col in mappingFields ) {
const fldMapping = mappingFields [ col ] ;
if ( fldMapping && fldMapping . column ) {
convRow [ col ] = row [ fldMapping . column ] ;
}
}
const subscription = fields . fromImport ( impt . list , flds , convRow ) ;
const email = cleanupFromPost ( convRow . email ) ;
let errorMsg ;
if ( ! email ) {
2018-11-18 20:31:22 +00:00
errorMsg = tLog ( 'missingEmail' ) ;
2018-09-01 19:29:10 +00:00
}
if ( mappingSettings . checkEmails ) {
const emailErr = await tools . validateEmail ( email ) ;
if ( emailErr ) {
errorMsg = tools . validateEmailGetMessage ( emailErr , email ) ;
}
}
if ( ! errorMsg ) {
subscription . email = email ;
subscription . source _id = row . id ;
subscriptions . push ( subscription ) ;
} else {
failures . push ( {
run : imptRun . id ,
source _id : row . id ,
email : email ,
reason : errorMsg
} ) ;
}
}
} ;
return await _execImportRun ( impt , handlers ) ;
2018-08-26 13:32:03 +00:00
}
async function basicUnsubscribe ( impt ) {
2018-09-01 19:29:10 +00:00
const handlers = {
processSourceRow : async ( impt , imptRun , flds , row , subscriptions , unsubscriptions , failures ) => {
const emailCol = imptRun . mapping . fields . email . column ;
const email = cleanupFromPost ( row [ emailCol ] ) ;
let errorMsg ;
if ( ! email ) {
2018-11-18 20:31:22 +00:00
errorMsg = tLog ( 'missingEmail' ) ;
2018-09-01 19:29:10 +00:00
}
if ( ! errorMsg ) {
unsubscriptions . push ( {
source _id : row . id ,
email
} ) ;
} else {
failures . push ( {
run : imptRun . id ,
source _id : row . id ,
email : email ,
reason : errorMsg
} ) ;
}
}
} ;
return await _execImportRun ( impt , handlers ) ;
2018-08-26 13:32:03 +00:00
}
2018-08-06 14:54:51 +00:00
async function getTask ( ) {
2018-08-26 09:46:12 +00:00
return await knex . transaction ( async tx => {
2018-08-06 14:54:51 +00:00
const impt = await tx ( 'imports' ) . whereIn ( 'status' , [ ImportStatus . PREP _SCHEDULED , ImportStatus . RUN _SCHEDULED ] ) . orderBy ( 'created' , 'asc' ) . first ( ) ;
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
if ( impt ) {
2018-09-01 19:29:10 +00:00
impt . settings = JSON . parse ( impt . settings ) || { } ;
2017-06-22 16:24:53 +00:00
2018-08-26 13:32:03 +00:00
if ( impt . source === ImportSource . CSV _FILE && impt . status === ImportStatus . PREP _SCHEDULED ) {
2018-08-06 14:54:51 +00:00
await tx ( 'imports' ) . where ( 'id' , impt . id ) . update ( 'status' , ImportStatus . PREP _RUNNING ) ;
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . PREP _RUNNING } ) ;
2018-08-06 14:54:51 +00:00
return ( ) => prepareCsv ( impt ) ;
2018-08-26 13:32:03 +00:00
2018-09-01 19:29:10 +00:00
} else if ( impt . status === ImportStatus . RUN _SCHEDULED && impt . mapping _type === MappingType . BASIC _SUBSCRIBE ) {
2018-08-26 13:32:03 +00:00
await tx ( 'imports' ) . where ( 'id' , impt . id ) . update ( 'status' , ImportStatus . RUN _RUNNING ) ;
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . RUN _RUNNING } ) ;
2018-08-26 13:32:03 +00:00
return ( ) => basicSubscribe ( impt ) ;
2018-09-01 19:29:10 +00:00
} else if ( impt . status === ImportStatus . RUN _SCHEDULED && impt . mapping _type === MappingType . BASIC _UNSUBSCRIBE ) {
2018-08-26 13:32:03 +00:00
await tx ( 'imports' ) . where ( 'id' , impt . id ) . update ( 'status' , ImportStatus . RUN _RUNNING ) ;
2019-02-07 14:38:32 +00:00
await activityLog . logEntityActivity ( 'list' , ListActivityType . IMPORT _STATUS _CHANGE , impt . list , { importId : impt . id , importStatus : ImportStatus . RUN _RUNNING } ) ;
2018-08-26 13:32:03 +00:00
return ( ) => basicUnsubscribe ( impt ) ;
2017-06-22 16:24:53 +00:00
}
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
} else {
return null ;
}
2018-08-26 09:46:12 +00:00
} ) ;
2018-08-06 14:54:51 +00:00
}
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
async function run ( ) {
if ( running ) {
return ;
}
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
running = true ;
let task ;
while ( ( task = await getTask ( ) ) != null ) {
task ( ) ;
}
running = false ;
2016-04-04 12:36:30 +00:00
}
2018-08-06 14:54:51 +00:00
process . on ( 'message' , msg => {
if ( msg ) {
const type = msg . type ;
2016-04-04 12:36:30 +00:00
2018-08-06 14:54:51 +00:00
if ( type === 'scheduleCheck' ) {
2018-08-26 09:46:12 +00:00
run ( ) ;
2018-08-06 14:54:51 +00:00
}
}
} ) ;
2018-12-21 18:09:18 +00:00
if ( config . title ) {
process . title = config . title + ': importer' ;
}
2018-08-06 14:54:51 +00:00
process . send ( {
type : 'importer-started'
} ) ;
2016-04-04 12:36:30 +00:00
2018-08-26 09:46:12 +00:00
run ( ) ;