187 lines
		
	
	
		
			No EOL
		
	
	
		
			6.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			No EOL
		
	
	
		
			6.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict';
 | 
						|
 | 
						|
const { filesDir } = require('../models/files');
 | 
						|
const path = require('path');
 | 
						|
const log = require('./log');
 | 
						|
const knex = require('./knex');
 | 
						|
const fs = require('fs-extra-promise');
 | 
						|
const stream = require('stream');
 | 
						|
const privilegeHelpers = require('./privilege-helpers');
 | 
						|
const synchronized = require('./synchronized');
 | 
						|
const { tmpName } = require('tmp-promise');
 | 
						|
 | 
						|
const pruneBatchSize = 1000;
 | 
						|
 | 
						|
const fileCacheFilesDir = path.join(filesDir, 'cache');
 | 
						|
 | 
						|
const fileCaches = new Map();
 | 
						|
 | 
						|
async function _fileCache(typeId, cacheConfig, keyGen) {
 | 
						|
    if (fileCaches.has(typeId)) {
 | 
						|
        return fileCaches.get(typeId);
 | 
						|
    }
 | 
						|
 | 
						|
    const localFilesDir = path.join(fileCacheFilesDir, typeId);
 | 
						|
    await privilegeHelpers.ensureMailtrainDir(localFilesDir);
 | 
						|
 | 
						|
    let mayNeedPruning = true;
 | 
						|
 | 
						|
    const getLocalFileName = id => path.join(localFilesDir, id.toString());
 | 
						|
 | 
						|
    const pruneCache = async() => {
 | 
						|
        if (mayNeedPruning) {
 | 
						|
            try {
 | 
						|
                const maxSize = cacheConfig.maxSize * 1048576;
 | 
						|
 | 
						|
                let lastId = null;
 | 
						|
                let cumulativeSize = 0;
 | 
						|
 | 
						|
                while (true) {
 | 
						|
                    let entriesQry = knex('file_cache').where('type', typeId).orderBy('id', 'desc').limit(pruneBatchSize);
 | 
						|
                    if (lastId) {
 | 
						|
                        entriesQry = entriesQry.where('id', '<', lastId);
 | 
						|
                    }
 | 
						|
 | 
						|
                    const entries = await entriesQry;
 | 
						|
 | 
						|
                    if (entries.length > 0) {
 | 
						|
                        for (const entry of entries) {
 | 
						|
                            cumulativeSize += entry.size;
 | 
						|
                            if (cumulativeSize > maxSize) {
 | 
						|
                                await fs.unlinkAsync(getLocalFileName(entry.id));
 | 
						|
                                await knex('file_cache').where('id', entry.id).del();
 | 
						|
                            }
 | 
						|
 | 
						|
                            lastId = entry.id;
 | 
						|
                        }
 | 
						|
                    } else {
 | 
						|
                        break;
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            } catch (err) {
 | 
						|
                log.error('FileCache', err);
 | 
						|
            }
 | 
						|
 | 
						|
            mayNeedPruning = false;
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    await pruneCache();
 | 
						|
    setInterval(pruneCache, cacheConfig.pruneInterval * 1000);
 | 
						|
 | 
						|
 | 
						|
    const handleCache = async (key, res, next) => {
 | 
						|
        const fileEntry = await knex('file_cache').where('type', typeId).where('key', key).first();
 | 
						|
 | 
						|
        if (fileEntry) {
 | 
						|
            res.sendFile(
 | 
						|
                getLocalFileName(fileEntry.id),
 | 
						|
                {
 | 
						|
                    headers: {'Content-Type': fileEntry.mimetype}
 | 
						|
                },
 | 
						|
                err => {
 | 
						|
                    if (err && err.code === 'ENOENT') {
 | 
						|
                        // If entry is missing and yet we got here, it means that we hit the interval file creation/unlink and DB update.
 | 
						|
                        // In this case, we just generate the file and don't cache it.
 | 
						|
                        res.fileCacheResponse = res;
 | 
						|
                        next();
 | 
						|
 | 
						|
                    } else if (err) next(err);
 | 
						|
                }
 | 
						|
            );
 | 
						|
 | 
						|
        } else {
 | 
						|
            // This means that the file is not present or it is just being created. We thus generate it and cache it.
 | 
						|
            let fileStream = null;
 | 
						|
            let tmpFilePath = null;
 | 
						|
 | 
						|
            const ensureFileStream = callback => {
 | 
						|
                if (!fileStream) {
 | 
						|
                    tmpName().then(tmp => {
 | 
						|
                        tmpFilePath = tmp;
 | 
						|
                        fileStream = fs.createWriteStream(tmpFilePath);
 | 
						|
                        setTimeout(callback, 5000);
 | 
						|
                    })
 | 
						|
                } else {
 | 
						|
                    callback();
 | 
						|
                }
 | 
						|
            };
 | 
						|
 | 
						|
            let fileSize = 0;
 | 
						|
 | 
						|
            res.fileCacheResponse = new stream.Writable({
 | 
						|
                write(chunk, encoding, callback) {
 | 
						|
                    res.write(chunk, encoding);
 | 
						|
 | 
						|
                    fileSize += chunk.length;
 | 
						|
                    ensureFileStream(() => {
 | 
						|
                        fileStream.write(chunk, encoding);
 | 
						|
                        callback();
 | 
						|
                    });
 | 
						|
                },
 | 
						|
 | 
						|
                final(callback) {
 | 
						|
                    res.end();
 | 
						|
 | 
						|
                    ensureFileStream(() => {
 | 
						|
                        fileStream.end(null, null, async () => {
 | 
						|
                            try {
 | 
						|
                                await knex.transaction(async tx => {
 | 
						|
                                    const existingFileEntry = await knex('file_cache').where('type', typeId).where('key', key).first();
 | 
						|
 | 
						|
                                    if (!existingFileEntry) {
 | 
						|
                                        const ids = await tx('file_cache').insert({type: typeId, key, mimetype: res.getHeader('Content-Type'), size: fileSize});
 | 
						|
                                        await fs.moveAsync(tmpFilePath, getLocalFileName(ids[0]), {});
 | 
						|
                                        mayNeedPruning = true;
 | 
						|
                                    } else {
 | 
						|
                                        await fs.unlinkAsync(tmpFilePath);
 | 
						|
                                    }
 | 
						|
                                });
 | 
						|
                            } catch (err) {
 | 
						|
                                await fs.unlinkAsync(tmpFilePath);
 | 
						|
                            }
 | 
						|
 | 
						|
                            callback();
 | 
						|
                        });
 | 
						|
                    });
 | 
						|
                },
 | 
						|
 | 
						|
                destroy(err, callback) {
 | 
						|
                    res.destroy(err);
 | 
						|
 | 
						|
                    if (fileStream) {
 | 
						|
                        fileStream.destroy(err);
 | 
						|
                        fs.unlink(tmpFilePath, () => {
 | 
						|
                            cachedFiles.delete(key);
 | 
						|
                            callback();
 | 
						|
                        });
 | 
						|
                    } else {
 | 
						|
                        callback();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            });
 | 
						|
 | 
						|
            next();
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    const thisFileCache = (req, res, next) => {
 | 
						|
        const key = keyGen ? keyGen(req) : req.url.substring(1);
 | 
						|
 | 
						|
        if (key === null) { // null key means we don't attempt to cache
 | 
						|
            res.fileCacheResponse = res;
 | 
						|
            next();
 | 
						|
 | 
						|
        } else {
 | 
						|
            handleCache(key, res, next).catch(err => next(err));
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    fileCaches.set(typeId, thisFileCache);
 | 
						|
    return thisFileCache;
 | 
						|
}
 | 
						|
 | 
						|
const fileCache = synchronized(_fileCache);
 | 
						|
 | 
						|
module.exports.fileCache = fileCache;
 | 
						|
module.exports.fileCacheFilesDir = fileCacheFilesDir; |