Further improvements in caching. The state of the cache is now persisted in DB. This persists the cache between server restarts.

This commit is contained in:
Tomas Bures 2019-04-22 11:41:37 +02:00
parent 7bcd6c60e9
commit ef0464bbc9
8 changed files with 155 additions and 75 deletions

View file

@ -2,12 +2,16 @@
const { filesDir } = require('../models/files');
const path = require('path');
const log = require('./log');
const knex = require('./knex');
const fs = require('fs-extra-promise');
const stream = require('stream');
const privilegeHelpers = require('./privilege-helpers');
const synchronized = require('./synchronized');
const { tmpName } = require('tmp-promise');
const pruneBatchSize = 1000;
const fileCacheFilesDir = path.join(filesDir, 'cache');
const fileCaches = new Map();
@ -18,71 +22,85 @@ async function _fileCache(typeId, cacheConfig, fileNameGen) {
}
const localFilesDir = path.join(fileCacheFilesDir, typeId);
await fs.emptyDirAsync(localFilesDir);
await privilegeHelpers.ensureMailtrainDir(localFilesDir);
const cachedFiles = new Map();
let nextFilesOrder = 1;
let mayNeedPruning = true;
const getLocalFileName = id => path.join(localFilesDir, id.toString());
const pruneCache = async() => {
const entries = [];
for (const entry of cachedFiles.values()) {
if (entry.isReady) {
entries.push(entry);
}
}
if (mayNeedPruning) {
try {
const maxSize = cacheConfig.maxSize * 1048576;
entries.sort((x, y) => y.order - x.order);
let lastId = null;
let cumulativeSize = 0;
let cumulativeSize = 0;
const maxSize = cacheConfig.maxSize * 1048576;
for (const entry of entries) {
cumulativeSize += entry.size;
if (cumulativeSize > maxSize) {
entry.isReady = false;
await fs.unlinkAsync(path.join(localFilesDir, entry.fileName));
cachedFiles.delete(entry.fileName);
while (true) {
let entriesQry = knex('file_cache').where('type', typeId).orderBy('id', 'desc').limit(pruneBatchSize);
if (lastId) {
entriesQry = entriesQry.where('id', '<', lastId);
}
const entries = await entriesQry;
if (entries.length > 0) {
for (const entry of entries) {
cumulativeSize += entry.size;
if (cumulativeSize > maxSize) {
await fs.unlinkAsync(getLocalFileName(entry.id));
await knex('file_cache').where('id', entry.id).del();
}
lastId = entry.id;
}
} else {
break;
}
}
} catch (err) {
log.error('FileCache', err);
}
mayNeedPruning = false;
}
};
const thisFileCache = (req, res, next) => {
const fileName = fileNameGen ? fileNameGen(req) : req.url.substring(1);
const localFilePath = path.join(localFilesDir, fileName);
await pruneCache();
setInterval(pruneCache, cacheConfig.pruneInterval * 1000);
const fileInfo = cachedFiles.get(fileName);
if (fileInfo && fileInfo.isReady) {
const handleCache = async (fileName, res, next) => {
const fileEntry = await knex('file_cache').where('type', typeId).where('url', fileName).first();
if (fileEntry) {
res.sendFile(
localFilePath,
getLocalFileName(fileEntry.id),
{
headers: fileInfo.headers
headers: {'Content-Type': fileEntry.mimetype}
},
err => {
if (err) next(err);
if (err && err.code === 'ENOENT') {
// If entry is missing and yet we got here, it means that we hit the interval file creation/unlink and DB update.
// In this case, we just generate the file and don't cache it.
res.fileCacheResponse = res;
next();
} else if (err) next(err);
}
);
} else {
// This means that the file is not present. We thus generate it and cache it.
// This means that the file is not present or it is just being created. We thus generate it and cache it.
let fileStream = null;
let tmpFilePath = null;
// If the file does not exist yet, we store. If we receive a simulataneous request, while the file is being generate and stored,
// we only generate it (but not store it) in the second parallel request.
const isStoring = !fileInfo;
if (isStoring) {
cachedFiles.set(fileName, {
fileName,
isReady: false
});
}
const ensureFileStream = callback => {
if (!fileStream) {
tmpName().then(tmp => {
tmpFilePath = tmp;
fileStream = fs.createWriteStream(tmpFilePath);
callback();
setTimeout(callback, 5000);
})
} else {
callback();
@ -95,46 +113,37 @@ async function _fileCache(typeId, cacheConfig, fileNameGen) {
write(chunk, encoding, callback) {
res.write(chunk, encoding);
if (isStoring) {
fileSize += chunk.length;
ensureFileStream(() => {
fileStream.write(chunk, encoding);
callback();
});
} else {
fileSize += chunk.length;
ensureFileStream(() => {
fileStream.write(chunk, encoding);
callback();
}
});
},
final(callback) {
res.end();
if (isStoring) {
ensureFileStream(() => {
fileStream.end(null, null, () => {
fs.moveAsync(tmpFilePath, localFilePath, {})
.then(() => {
cachedFiles.set(fileName, {
fileName,
size: fileSize,
order: nextFilesOrder,
headers: res.getHeaders(),
isReady: true
});
ensureFileStream(() => {
fileStream.end(null, null, async () => {
try {
await knex.transaction(async tx => {
const existingFileEntry = await knex('file_cache').where('type', typeId).where('url', fileName).first();
nextFilesOrder += 1;
if (!existingFileEntry) {
const ids = await tx('file_cache').insert({type: typeId, url: fileName, mimetype: res.getHeader('Content-Type'), size: fileSize});
await fs.moveAsync(tmpFilePath, getLocalFileName(ids[0]), {});
mayNeedPruning = true;
} else {
await fs.unlinkAsync(tmpFilePath);
}
});
} catch (err) {
await fs.unlinkAsync(tmpFilePath);
}
callback();
// noinspection JSIgnoredPromiseFromCall
pruneCache();
})
.catch(err => next(err));
});
callback();
});
} else {
callback();
}
});
},
destroy(err, callback) {
@ -156,6 +165,18 @@ async function _fileCache(typeId, cacheConfig, fileNameGen) {
}
};
const thisFileCache = (req, res, next) => {
const fileName = fileNameGen ? fileNameGen(req) : req.url.substring(1);
if (fileName === null) { // null fileName means we don't attempt to cache
res.fileCacheResponse = res;
next();
} else {
handleCache(fileName, res, next).catch(err => next(err));
}
};
fileCaches.set(typeId, thisFileCache);
return thisFileCache;
}

View file

@ -50,13 +50,18 @@ function getConfigROUidGid() {
function ensureMailtrainOwner(file, callback) {
const ids = getConfigUidGid();
fs.chown(file, ids.uid, ids.gid, callback);
if (callback) {
fs.chown(file, ids.uid, ids.gid, callback);
} else {
return fs.chownAsync(file, ids.uid, ids.gid);
}
}
async function ensureMailtrainDir(dir, recursive) {
async function ensureMailtrainDir(dir) {
const ids = getConfigUidGid();
await fs.ensureDir(dir);
await fs.chownAsync(dir, ids.uid, ids.gid);
await ensureMailtrainOwner(dir);
}
function dropRootPrivileges() {