1
0
Fork 0
mirror of https://github.com/Ylianst/MeshCentral.git synced 2025-03-09 15:40:18 +00:00

Made MQTT optional.

This commit is contained in:
Ylian Saint-Hilaire 2019-10-04 12:18:56 -07:00
parent 8abb6ef910
commit 7bba856984
7 changed files with 88 additions and 86 deletions

View file

@ -155,11 +155,11 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
disconnectCommandCount: disconnectCommandCount,
socketClosedCount: socketClosedCount,
socketErrorCount: socketErrorCount,
maxDomainDevicesReached : maxDomainDevicesReached
maxDomainDevicesReached: maxDomainDevicesReached
};
}
// required for TLS piping to MQTT broker
// Required for TLS piping to MQTT broker
function SerialTunnel(options) {
var obj = new require('stream').Duplex(options);
obj.forwardwrite = null;
@ -167,12 +167,13 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
obj._write = function (chunk, encoding, callback) { if (obj.forwardwrite != null) { obj.forwardwrite(chunk); } else { console.err("Failed to fwd _write."); } if (callback) callback(); }; // Pass data written to forward
obj._read = function (size) { }; // Push nothing, anything to read should be pushed from updateBuffer()
return obj;
}
}
// Return's the length of an MQTT packet
function getMQTTPacketLength(chunk) {
var packet_len = 0;
if (chunk.readUInt8(0)==16) {
if (chunk.readUInt8(1) < 128 ) {
if (chunk.readUInt8(0) == 16) {
if (chunk.readUInt8(1) < 128) {
packet_len += chunk.readUInt8(1) + 2;
} else {
// continuation bit, get real value and do next
@ -188,15 +189,16 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
if (chunk.readUInt8(4) < 128) {
packet_len += 1 + chunk.readUInt8(4) * 128 * 128 * 128;
} else {
packet_len += 1 + (chunk.readUInt8(4) & 0x7F) * 128* 128 * 128;
packet_len += 1 + (chunk.readUInt8(4) & 0x7F) * 128 * 128 * 128;
}
}
}
}
}
}
return packet_len;
}
// Called when a new TLS/TCP connection is accepted
function onConnection(socket) {
connectionCount++;
if (obj.args.mpstlsoffload) {
@ -220,31 +222,33 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
if (socket.tag.accumulator.length < 3) return;
//if (!socket.tag.clientCert.subject) { console.log("MPS Connection, no client cert: " + socket.remoteAddress); socket.write('HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nMeshCentral2 MPS server.\r\nNo client certificate given.'); socket.end(); return; }
if (socket.tag.accumulator.substring(0, 3) == "GET") { if (args.mpsdebug) { console.log("MPS Connection, HTTP GET detected: " + socket.remoteAddress); } socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n<!DOCTYPE html><html><head><meta charset=\"UTF-8\"></head><body>MeshCentral2 MPS server.<br />Intel&reg; AMT computers should connect here.</body></html>"); socket.end(); return; }
var chunk = Buffer.from(socket.tag.accumulator,"binary");
var packet_len = 0;
if (chunk.readUInt8(0)==16) {
packet_len = getMQTTPacketLength(chunk);
}
if (chunk.readUInt8(0)==16 && (socket.tag.accumulator.length < packet_len )) return;// minimum MQTT detection
// check if it is MQTT, need more initial packet to probe
if (chunk.readUInt8(0) == 16 && ((chunk.slice(4, 8).toString() === "MQTT") || (chunk.slice(5, 9).toString() === "MQTT")
|| (chunk.slice(6, 10).toString() === "MQTT") || (chunk.slice(7, 11).toString() === "MQTT"))) {
parent.debug("mps", "MQTT connection detected.");
socket.removeAllListeners("data");
socket.removeAllListeners("close");
socket.setNoDelay(true);
socket.serialtunnel = SerialTunnel();
socket.on('data', function(b) { socket.serialtunnel.updateBuffer(Buffer.from(b,'binary'))});
socket.serialtunnel.forwardwrite = function(b) { socket.write(b,"binary")}
socket.on("close", function() { socket.serialtunnel.emit('end');});
//pass socket wrapper to mqtt broker
parent.mqttbroker.handle(socket.serialtunnel);
socket.unshift(socket.tag.accumulator);
return;
// If the MQTT broker is active, look for inbound MQTT connections
if (parent.mqttbroker != null) {
var chunk = Buffer.from(socket.tag.accumulator, "binary");
var packet_len = 0;
if (chunk.readUInt8(0) == 16) { packet_len = getMQTTPacketLength(chunk); }
if (chunk.readUInt8(0) == 16 && (socket.tag.accumulator.length < packet_len)) return; // Minimum MQTT detection
// check if it is MQTT, need more initial packet to probe
if (chunk.readUInt8(0) == 16 && ((chunk.slice(4, 8).toString() === "MQTT") || (chunk.slice(5, 9).toString() === "MQTT")
|| (chunk.slice(6, 10).toString() === "MQTT") || (chunk.slice(7, 11).toString() === "MQTT"))) {
parent.debug("mps", "MQTT connection detected.");
socket.removeAllListeners("data");
socket.removeAllListeners("close");
socket.setNoDelay(true);
socket.serialtunnel = SerialTunnel();
socket.on('data', function (b) { socket.serialtunnel.updateBuffer(Buffer.from(b, 'binary')) });
socket.serialtunnel.forwardwrite = function (b) { socket.write(b, "binary") }
socket.on("close", function () { socket.serialtunnel.emit('end'); });
// Pass socket wrapper to the MQTT broker
parent.mqttbroker.handle(socket.serialtunnel);
socket.unshift(socket.tag.accumulator);
return;
}
}
socket.tag.first = false;
// Setup this node with certificate authentication