187 lines
		
	
	
		
			No EOL
		
	
	
		
			6.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			No EOL
		
	
	
		
			6.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| 'use strict';
 | |
| 
 | |
| const { filesDir } = require('../models/files');
 | |
| const path = require('path');
 | |
| const log = require('./log');
 | |
| const knex = require('./knex');
 | |
| const fs = require('fs-extra-promise');
 | |
| const stream = require('stream');
 | |
| const privilegeHelpers = require('./privilege-helpers');
 | |
| const synchronized = require('./synchronized');
 | |
| const { tmpName } = require('tmp-promise');
 | |
| 
 | |
| const pruneBatchSize = 1000;
 | |
| 
 | |
| const fileCacheFilesDir = path.join(filesDir, 'cache');
 | |
| 
 | |
| const fileCaches = new Map();
 | |
| 
 | |
| async function _fileCache(typeId, cacheConfig, keyGen) {
 | |
|     if (fileCaches.has(typeId)) {
 | |
|         return fileCaches.get(typeId);
 | |
|     }
 | |
| 
 | |
|     const localFilesDir = path.join(fileCacheFilesDir, typeId);
 | |
|     await privilegeHelpers.ensureMailtrainDir(localFilesDir);
 | |
| 
 | |
|     let mayNeedPruning = true;
 | |
| 
 | |
|     const getLocalFileName = id => path.join(localFilesDir, id.toString());
 | |
| 
 | |
|     const pruneCache = async() => {
 | |
|         if (mayNeedPruning) {
 | |
|             try {
 | |
|                 const maxSize = cacheConfig.maxSize * 1048576;
 | |
| 
 | |
|                 let lastId = null;
 | |
|                 let cumulativeSize = 0;
 | |
| 
 | |
|                 while (true) {
 | |
|                     let entriesQry = knex('file_cache').where('type', typeId).orderBy('id', 'desc').limit(pruneBatchSize);
 | |
|                     if (lastId) {
 | |
|                         entriesQry = entriesQry.where('id', '<', lastId);
 | |
|                     }
 | |
| 
 | |
|                     const entries = await entriesQry;
 | |
| 
 | |
|                     if (entries.length > 0) {
 | |
|                         for (const entry of entries) {
 | |
|                             cumulativeSize += entry.size;
 | |
|                             if (cumulativeSize > maxSize) {
 | |
|                                 await fs.unlinkAsync(getLocalFileName(entry.id));
 | |
|                                 await knex('file_cache').where('id', entry.id).del();
 | |
|                             }
 | |
| 
 | |
|                             lastId = entry.id;
 | |
|                         }
 | |
|                     } else {
 | |
|                         break;
 | |
|                     }
 | |
|                 }
 | |
|             } catch (err) {
 | |
|                 log.error('FileCache', err);
 | |
|             }
 | |
| 
 | |
|             mayNeedPruning = false;
 | |
|         }
 | |
|     };
 | |
| 
 | |
|     await pruneCache();
 | |
|     setInterval(pruneCache, cacheConfig.pruneInterval * 1000);
 | |
| 
 | |
| 
 | |
|     const handleCache = async (key, res, next) => {
 | |
|         const fileEntry = await knex('file_cache').where('type', typeId).where('key', key).first();
 | |
| 
 | |
|         if (fileEntry) {
 | |
|             res.sendFile(
 | |
|                 getLocalFileName(fileEntry.id),
 | |
|                 {
 | |
|                     headers: {'Content-Type': fileEntry.mimetype}
 | |
|                 },
 | |
|                 err => {
 | |
|                     if (err && err.code === 'ENOENT') {
 | |
|                         // If entry is missing and yet we got here, it means that we hit the interval file creation/unlink and DB update.
 | |
|                         // In this case, we just generate the file and don't cache it.
 | |
|                         res.fileCacheResponse = res;
 | |
|                         next();
 | |
| 
 | |
|                     } else if (err) next(err);
 | |
|                 }
 | |
|             );
 | |
| 
 | |
|         } else {
 | |
|             // This means that the file is not present or it is just being created. We thus generate it and cache it.
 | |
|             let fileStream = null;
 | |
|             let tmpFilePath = null;
 | |
| 
 | |
|             const ensureFileStream = callback => {
 | |
|                 if (!fileStream) {
 | |
|                     tmpName().then(tmp => {
 | |
|                         tmpFilePath = tmp;
 | |
|                         fileStream = fs.createWriteStream(tmpFilePath);
 | |
|                         setTimeout(callback, 5000);
 | |
|                     })
 | |
|                 } else {
 | |
|                     callback();
 | |
|                 }
 | |
|             };
 | |
| 
 | |
|             let fileSize = 0;
 | |
| 
 | |
|             res.fileCacheResponse = new stream.Writable({
 | |
|                 write(chunk, encoding, callback) {
 | |
|                     res.write(chunk, encoding);
 | |
| 
 | |
|                     fileSize += chunk.length;
 | |
|                     ensureFileStream(() => {
 | |
|                         fileStream.write(chunk, encoding);
 | |
|                         callback();
 | |
|                     });
 | |
|                 },
 | |
| 
 | |
|                 final(callback) {
 | |
|                     res.end();
 | |
| 
 | |
|                     ensureFileStream(() => {
 | |
|                         fileStream.end(null, null, async () => {
 | |
|                             try {
 | |
|                                 await knex.transaction(async tx => {
 | |
|                                     const existingFileEntry = await knex('file_cache').where('type', typeId).where('key', key).first();
 | |
| 
 | |
|                                     if (!existingFileEntry) {
 | |
|                                         const ids = await tx('file_cache').insert({type: typeId, key, mimetype: res.getHeader('Content-Type'), size: fileSize});
 | |
|                                         await fs.moveAsync(tmpFilePath, getLocalFileName(ids[0]), {});
 | |
|                                         mayNeedPruning = true;
 | |
|                                     } else {
 | |
|                                         await fs.unlinkAsync(tmpFilePath);
 | |
|                                     }
 | |
|                                 });
 | |
|                             } catch (err) {
 | |
|                                 await fs.unlinkAsync(tmpFilePath);
 | |
|                             }
 | |
| 
 | |
|                             callback();
 | |
|                         });
 | |
|                     });
 | |
|                 },
 | |
| 
 | |
|                 destroy(err, callback) {
 | |
|                     res.destroy(err);
 | |
| 
 | |
|                     if (fileStream) {
 | |
|                         fileStream.destroy(err);
 | |
|                         fs.unlink(tmpFilePath, () => {
 | |
|                             cachedFiles.delete(key);
 | |
|                             callback();
 | |
|                         });
 | |
|                     } else {
 | |
|                         callback();
 | |
|                     }
 | |
|                 }
 | |
|             });
 | |
| 
 | |
|             next();
 | |
|         }
 | |
|     };
 | |
| 
 | |
|     const thisFileCache = (req, res, next) => {
 | |
|         const key = keyGen ? keyGen(req) : req.url.substring(1);
 | |
| 
 | |
|         if (key === null) { // null key means we don't attempt to cache
 | |
|             res.fileCacheResponse = res;
 | |
|             next();
 | |
| 
 | |
|         } else {
 | |
|             handleCache(key, res, next).catch(err => next(err));
 | |
|         }
 | |
|     };
 | |
| 
 | |
|     fileCaches.set(typeId, thisFileCache);
 | |
|     return thisFileCache;
 | |
| }
 | |
| 
 | |
| const fileCache = synchronized(_fileCache);
 | |
| 
 | |
| module.exports.fileCache = fileCache;
 | |
| module.exports.fileCacheFilesDir = fileCacheFilesDir; |