mailtrain/server/lib/file-cache.js

187 lines
6.3 KiB
JavaScript
Raw Normal View History

'use strict';
const { filesDir } = require('../models/files');
const path = require('path');
const log = require('./log');
const knex = require('./knex');
const fs = require('fs-extra-promise');
const stream = require('stream');
const privilegeHelpers = require('./privilege-helpers');
const synchronized = require('./synchronized');
const { tmpName } = require('tmp-promise');
const pruneBatchSize = 1000;
const fileCacheFilesDir = path.join(filesDir, 'cache');
const fileCaches = new Map();
async function _fileCache(typeId, cacheConfig, keyGen) {
if (fileCaches.has(typeId)) {
return fileCaches.get(typeId);
}
const localFilesDir = path.join(fileCacheFilesDir, typeId);
await privilegeHelpers.ensureMailtrainDir(localFilesDir);
let mayNeedPruning = true;
const getLocalFileName = id => path.join(localFilesDir, id.toString());
const pruneCache = async() => {
if (mayNeedPruning) {
try {
const maxSize = cacheConfig.maxSize * 1048576;
let lastId = null;
let cumulativeSize = 0;
while (true) {
let entriesQry = knex('file_cache').where('type', typeId).orderBy('id', 'desc').limit(pruneBatchSize);
if (lastId) {
entriesQry = entriesQry.where('id', '<', lastId);
}
const entries = await entriesQry;
if (entries.length > 0) {
for (const entry of entries) {
cumulativeSize += entry.size;
if (cumulativeSize > maxSize) {
await fs.unlinkAsync(getLocalFileName(entry.id));
await knex('file_cache').where('id', entry.id).del();
}
lastId = entry.id;
}
} else {
break;
}
}
} catch (err) {
log.error('FileCache', err);
}
mayNeedPruning = false;
}
};
await pruneCache();
setInterval(pruneCache, cacheConfig.pruneInterval * 1000);
const handleCache = async (key, res, next) => {
const fileEntry = await knex('file_cache').where('type', typeId).where('key', key).first();
if (fileEntry) {
res.sendFile(
getLocalFileName(fileEntry.id),
{
headers: {'Content-Type': fileEntry.mimetype}
},
err => {
if (err && err.code === 'ENOENT') {
// If entry is missing and yet we got here, it means that we hit the interval file creation/unlink and DB update.
// In this case, we just generate the file and don't cache it.
res.fileCacheResponse = res;
next();
} else if (err) next(err);
}
);
} else {
// This means that the file is not present or it is just being created. We thus generate it and cache it.
let fileStream = null;
let tmpFilePath = null;
const ensureFileStream = callback => {
if (!fileStream) {
tmpName().then(tmp => {
tmpFilePath = tmp;
fileStream = fs.createWriteStream(tmpFilePath);
setTimeout(callback, 5000);
})
} else {
callback();
}
};
let fileSize = 0;
res.fileCacheResponse = new stream.Writable({
write(chunk, encoding, callback) {
res.write(chunk, encoding);
fileSize += chunk.length;
ensureFileStream(() => {
fileStream.write(chunk, encoding);
callback();
});
},
final(callback) {
res.end();
ensureFileStream(() => {
fileStream.end(null, null, async () => {
try {
await knex.transaction(async tx => {
const existingFileEntry = await knex('file_cache').where('type', typeId).where('key', key).first();
if (!existingFileEntry) {
const ids = await tx('file_cache').insert({type: typeId, key, mimetype: res.getHeader('Content-Type'), size: fileSize});
await fs.moveAsync(tmpFilePath, getLocalFileName(ids[0]), {});
mayNeedPruning = true;
} else {
await fs.unlinkAsync(tmpFilePath);
}
});
} catch (err) {
await fs.unlinkAsync(tmpFilePath);
}
callback();
});
});
},
destroy(err, callback) {
res.destroy(err);
if (fileStream) {
fileStream.destroy(err);
fs.unlink(tmpFilePath, () => {
cachedFiles.delete(key);
callback();
});
} else {
callback();
}
}
});
next();
}
};
const thisFileCache = (req, res, next) => {
const key = keyGen ? keyGen(req) : req.url.substring(1);
if (key === null) { // null key means we don't attempt to cache
res.fileCacheResponse = res;
next();
} else {
handleCache(key, res, next).catch(err => next(err));
}
};
fileCaches.set(typeId, thisFileCache);
return thisFileCache;
}
const fileCache = synchronized(_fileCache);
module.exports.fileCache = fileCache;
module.exports.fileCacheFilesDir = fileCacheFilesDir;