mirror of
https://github.com/Ylianst/MeshCentral.git
synced 2025-03-09 15:40:18 +00:00
Added MQTT support over WSS and multiplexed with MPS
This commit is contained in:
parent
4094daf58f
commit
8f0517b80c
5 changed files with 134 additions and 0 deletions
63
mpsserver.js
63
mpsserver.js
|
@ -159,6 +159,44 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
|
|||
};
|
||||
}
|
||||
|
||||
// required for TLS piping to MQTT broker
|
||||
function SerialTunnel(options) {
|
||||
var obj = new require('stream').Duplex(options);
|
||||
obj.forwardwrite = null;
|
||||
obj.updateBuffer = function (chunk) { this.push(chunk); };
|
||||
obj._write = function (chunk, encoding, callback) { if (obj.forwardwrite != null) { obj.forwardwrite(chunk); } else { console.err("Failed to fwd _write."); } if (callback) callback(); }; // Pass data written to forward
|
||||
obj._read = function (size) { }; // Push nothing, anything to read should be pushed from updateBuffer()
|
||||
return obj;
|
||||
}
|
||||
|
||||
function getMQTTPacketLength(chunk) {
|
||||
var packet_len = 0;
|
||||
if (chunk.readUInt8(0)==16) {
|
||||
if (chunk.readUInt8(1) < 128 ) {
|
||||
packet_len += chunk.readUInt8(1) + 2;
|
||||
} else {
|
||||
// continuation bit, get real value and do next
|
||||
packet_len += (chunk.readUInt8(1) & 0x7F) + 2;
|
||||
if (chunk.readUInt8(2) < 128) {
|
||||
packet_len += 1 + chunk.readUInt8(2) * 128;
|
||||
} else {
|
||||
packet_len += 1 + (chunk.readUInt8(2) & 0x7F) * 128;
|
||||
if (chunk.readUInt8(3) < 128) {
|
||||
packet_len += 1 + chunk.readUInt8(3) * 128 * 128;
|
||||
} else {
|
||||
packet_len += 1 + (chunk.readUInt8(3) & 0x7F) * 128 * 128;
|
||||
if (chunk.readUInt8(4) < 128) {
|
||||
packet_len += 1 + chunk.readUInt8(4) * 128 * 128 * 128;
|
||||
} else {
|
||||
packet_len += 1 + (chunk.readUInt8(4) & 0x7F) * 128* 128 * 128;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return packet_len;
|
||||
}
|
||||
|
||||
function onConnection(socket) {
|
||||
connectionCount++;
|
||||
if (obj.args.mpstlsoffload) {
|
||||
|
@ -182,6 +220,31 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
|
|||
if (socket.tag.accumulator.length < 3) return;
|
||||
//if (!socket.tag.clientCert.subject) { console.log("MPS Connection, no client cert: " + socket.remoteAddress); socket.write('HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nMeshCentral2 MPS server.\r\nNo client certificate given.'); socket.end(); return; }
|
||||
if (socket.tag.accumulator.substring(0, 3) == "GET") { if (args.mpsdebug) { console.log("MPS Connection, HTTP GET detected: " + socket.remoteAddress); } socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n<!DOCTYPE html><html><head><meta charset=\"UTF-8\"></head><body>MeshCentral2 MPS server.<br />Intel® AMT computers should connect here.</body></html>"); socket.end(); return; }
|
||||
|
||||
var chunk = Buffer.from(socket.tag.accumulator,"binary");
|
||||
var packet_len = 0;
|
||||
if (chunk.readUInt8(0)==16) {
|
||||
packet_len = getMQTTPacketLength(chunk);
|
||||
}
|
||||
|
||||
if (chunk.readUInt8(0)==16 && (socket.tag.accumulator.length < packet_len )) return;// minimum MQTT detection
|
||||
|
||||
// check if it is MQTT, need more initial packet to probe
|
||||
if (chunk.readUInt8(0) == 16 && ((chunk.slice(4, 8).toString() === "MQTT") || (chunk.slice(5, 9).toString() === "MQTT")
|
||||
|| (chunk.slice(6, 10).toString() === "MQTT") || (chunk.slice(7, 11).toString() === "MQTT"))) {
|
||||
parent.debug("mps", "MQTT connection detected.");
|
||||
socket.removeAllListeners("data");
|
||||
socket.removeAllListeners("close");
|
||||
socket.setNoDelay(true);
|
||||
socket.serialtunnel = SerialTunnel();
|
||||
socket.on('data', function(b) { socket.serialtunnel.updateBuffer(Buffer.from(b,'binary'))});
|
||||
socket.serialtunnel.forwardwrite = function(b) { socket.write(b,"binary")}
|
||||
socket.on("close", function() { socket.serialtunnel.emit('end');});
|
||||
//pass socket wrapper to mqtt broker
|
||||
parent.mqttbroker.handle(socket.serialtunnel);
|
||||
socket.unshift(socket.tag.accumulator);
|
||||
return;
|
||||
}
|
||||
socket.tag.first = false;
|
||||
|
||||
// Setup this node with certificate authentication
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue