diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index b0be0105..5355cd3e 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -434,27 +434,49 @@ enum ZT_ResultCode enum ZT_MultipathMode { /** - * No active multipath. - * - * Traffic is merely sent over the strongest path. That being - * said, this mode will automatically failover in the event that a link goes down. + * No fault tolerance or balancing. */ ZT_MULTIPATH_NONE = 0, /** - * Traffic is randomly distributed among all active paths. - * - * Will cease sending traffic over links that appear to be stale. + * Sends traffic out on all paths. */ - ZT_MULTIPATH_RANDOM = 1, + ZT_MULTIPATH_BROADCAST = 1, /** - * Traffic is allocated across all active paths in proportion to their strength and - * reliability. - * - * Will cease sending traffic over links that appear to be stale. + * Sends traffic out on only one path at a time. Immediate fail-over. */ - ZT_MULTIPATH_PROPORTIONALLY_BALANCED = 2, + ZT_MULTIPATH_ACTIVE_BACKUP= 2, + + /** + * Sends traffic out on all interfaces according to a uniform random distribution. + */ + ZT_MULTIPATH_BALANCE_RANDOM = 3, + + /** + * Stripes packets across all paths. + */ + ZT_MULTIPATH_BALANCE_RR_OPAQUE = 4, + + /** + * Balances flows across all paths. + */ + ZT_MULTIPATH_BALANCE_RR_FLOW = 5, + + /** + * Hashes flows across all paths. + */ + ZT_MULTIPATH_BALANCE_XOR_FLOW = 6, + + /** + * Balances traffic across all paths according to observed performance. + */ + ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE = 7, + + /** + * Balances flows across all paths. + */ + ZT_MULTIPATH_BALANCE_DYNAMIC_FLOW = 8, }; /** diff --git a/node/Constants.hpp b/node/Constants.hpp index 3f95ac29..7f962851 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -266,6 +266,12 @@ */ #define ZT_LOCAL_CONF_FILE_CHECK_INTERVAL 10000 +/** + * How long before we consider a flow to be dead and remove it from the balancing + * policy's list. + */ +#define ZT_MULTIPATH_FLOW_EXPIRATION 60000 + /** * How frequently to check for changes to the system's network interfaces. When * the service decides to use this constant it's because we want to react more diff --git a/node/Path.hpp b/node/Path.hpp index bc8d7dc5..bc28c734 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -308,7 +308,6 @@ public: */ inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb) { - DEBUG_INFO(""); Mutex::Lock _l(_statistics_m); if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { @@ -332,7 +331,6 @@ public: */ inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb) { - DEBUG_INFO(""); Mutex::Lock _l(_statistics_m); if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { @@ -353,7 +351,6 @@ public: */ inline void receivedAck(int64_t now, int32_t ackedBytes) { - DEBUG_INFO(""); _expectingAckAsOf = 0; _unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes; int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation); @@ -398,7 +395,6 @@ public: */ inline void sentAck(int64_t now) { - DEBUG_INFO(""); Mutex::Lock _l(_statistics_m); _inACKRecords.clear(); _packetsReceivedSinceLastAck = 0; @@ -416,7 +412,6 @@ public: */ inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts) { - DEBUG_INFO(""); Mutex::Lock _l(_statistics_m); // Look up egress times and compute latency values for each record std::map::iterator it; @@ -441,7 +436,6 @@ public: */ inline int32_t generateQoSPacket(int64_t now, char *qosBuffer) { - DEBUG_INFO(""); Mutex::Lock _l(_statistics_m); int32_t len = 0; std::map::iterator it = _inQoSRecords.begin(); @@ -466,7 +460,6 @@ public: * @param Current time */ inline void sentQoS(int64_t now) { - DEBUG_INFO(""); _packetsReceivedSinceLastQoS = 0; _lastQoSMeasurement = now; } @@ -586,7 +579,6 @@ public: inline void processBackgroundPathMeasurements(const int64_t now) { if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) { - DEBUG_INFO(""); Mutex::Lock _l(_statistics_m); _lastPathQualityComputeTime = now; address().toString(_addrString); diff --git a/node/Peer.cpp b/node/Peer.cpp index ce3083cc..d1ef9ecf 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -75,7 +75,9 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _linkIsRedundant(false), _remotePeerMultipathEnabled(false), _lastAggregateStatsReport(0), - _lastAggregateAllocation(0) + _lastAggregateAllocation(0), + _virtualPathCount(0), + _roundRobinPathAssignmentIdx(0) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; @@ -195,6 +197,9 @@ void Peer::received( } else { attemptToContact = true; } + + // Every time we learn of new path, rebuild set of virtual paths + constructSetOfVirtualPaths(); } } @@ -256,6 +261,39 @@ void Peer::received( } } +void Peer::constructSetOfVirtualPaths() +{ + if (!_remoteMultipathSupported) { + return; + } + Mutex::Lock _l(_virtual_paths_m); + + int64_t now = RR->node->now(); + _virtualPathCount = 0; + for(unsigned int i=0;ialive(now)) { + for(unsigned int j=0;jalive(now)) { + int64_t localSocket = _paths[j].p->localSocket(); + bool foundVirtualPath = false; + for (int k=0; k<_virtualPaths.size(); k++) { + if (_virtualPaths[k]->localSocket == localSocket && _virtualPaths[k]->p == _paths[i].p) { + foundVirtualPath = true; + } + } + if (!foundVirtualPath) + { + VirtualPath *np = new VirtualPath; + np->p = _paths[i].p; + np->localSocket = localSocket; + _virtualPaths.push_back(np); + } + } + } + } + } +} + void Peer::recordOutgoingPacket(const SharedPtr &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { @@ -320,10 +358,10 @@ void Peer::computeAggregateAllocation(int64_t now) for(uint16_t i=0;inode->getMultipathMode() == ZT_MULTIPATH_RANDOM) { + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM) { _paths[i].p->updateComponentAllocationOfAggregateLink(((float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count()) * 255); } - if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE) { _paths[i].p->updateComponentAllocationOfAggregateLink((unsigned char)((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255)); } } @@ -382,9 +420,22 @@ int Peer::aggregateLinkLogicalPathCount() return pathCount; } -SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) +std::vector> Peer::getAllPaths(int64_t now) +{ + Mutex::Lock _l(_virtual_paths_m); // FIXME: TX can now lock RX + std::vector> paths; + for (int i=0; i<_virtualPaths.size(); i++) { + if (_virtualPaths[i]->p) { + paths.push_back(_virtualPaths[i]->p); + } + } + return paths; +} + +SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired, int64_t flowId) { Mutex::Lock _l(_paths_m); + SharedPtr selectedPath; unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; /** @@ -410,52 +461,129 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) return SharedPtr(); } + // Update path measurements for(unsigned int i=0;iprocessBackgroundPathMeasurements(now); } } + // Detect new flows and update existing records + if (_flows.count(flowId)) { + _flows[flowId]->lastSend = now; + } + else { + fprintf(stderr, "new flow %llx detected between this node and %llx (%lu active flow(s))\n", + flowId, this->_id.address().toInt(), (_flows.size()+1)); + struct Flow *newFlow = new Flow(flowId, now); + _flows[flowId] = newFlow; + newFlow->assignedPath = nullptr; + } + // Construct set of virtual paths if needed + if (!_virtualPaths.size()) { + constructSetOfVirtualPaths(); + } + if (!_virtualPaths.size()) { + fprintf(stderr, "no paths to send packet out on\n"); + return SharedPtr(); + } /** - * Randomly distribute traffic across all paths + * Traffic is randomly distributed among all active paths. */ int numAlivePaths = 0; int numStalePaths = 0; - if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) { - computeAggregateAllocation(now); /* This call is algorithmically inert but gives us a value to show in the status output */ - int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; - int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; - memset(&alivePaths, -1, sizeof(alivePaths)); - memset(&stalePaths, -1, sizeof(stalePaths)); - for(unsigned int i=0;ialive(now)) { - alivePaths[numAlivePaths] = i; - numAlivePaths++; - } - else { - stalePaths[numStalePaths] = i; - numStalePaths++; + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM) { + int sz = _virtualPaths.size(); + if (sz) { + int idx = _freeRandomByte % sz; + _pathChoiceHist.push(idx); + char pathStr[128]; + _virtualPaths[idx]->p->address().toString(pathStr); + fprintf(stderr, "sending out: (%llx), idx=%d: path=%s, localSocket=%lld\n", + this->_id.address().toInt(), idx, pathStr, _virtualPaths[idx]->localSocket); + return _virtualPaths[idx]->p; + } + // This call is algorithmically inert but gives us a value to show in the status output + computeAggregateAllocation(now); + } + + /** + * All traffic is sent on all paths. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) { + // Not handled here. Handled in Switch.cpp + } + + /** + * Only one link is active. Fail-over is immediate. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_ACTIVE_BACKUP) { + // fprintf(stderr, "ZT_MULTIPATH_ACTIVE_BACKUP\n"); + } + + /** + * Packets are striped across all available paths. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RR_OPAQUE) { + // fprintf(stderr, "ZT_MULTIPATH_BALANCE_RR_OPAQUE\n"); + int16_t previousIdx = _roundRobinPathAssignmentIdx; + if (_roundRobinPathAssignmentIdx < (_virtualPaths.size()-1)) { + _roundRobinPathAssignmentIdx++; + } + else { + _roundRobinPathAssignmentIdx = 0; + } + selectedPath = _virtualPaths[previousIdx]->p; + char pathStr[128]; + selectedPath->address().toString(pathStr); + fprintf(stderr, "sending packet out on path %s at index %d\n", + pathStr, previousIdx); + return selectedPath; + } + + /** + * Flows are striped across all available paths. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RR_FLOW) { + // fprintf(stderr, "ZT_MULTIPATH_BALANCE_RR_FLOW\n"); + } + + /** + * Flows are hashed across all available paths. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_XOR_FLOW) { + // fprintf(stderr, "ZT_MULTIPATH_BALANCE_XOR_FLOW (%llx) \n", flowId); + char pathStr[128]; + struct Flow *currFlow = NULL; + if (_flows.count(flowId)) { + currFlow = _flows[flowId]; + if (!currFlow->assignedPath) { + int idx = abs((int)(currFlow->flowId % (_virtualPaths.size()-1))); + currFlow->assignedPath = _virtualPaths[idx]; + _virtualPaths[idx]->p->address().toString(pathStr); + fprintf(stderr, "assigning flow %llx between this node and peer %llx to path %s at index %d\n", + currFlow->flowId, this->_id.address().toInt(), pathStr, idx); + } + else { + if (!currFlow->assignedPath->p->alive(now)) { + char newPathStr[128]; + currFlow->assignedPath->p->address().toString(pathStr); + // Re-assign + int idx = abs((int)(currFlow->flowId % (_virtualPaths.size()-1))); + currFlow->assignedPath = _virtualPaths[idx]; + _virtualPaths[idx]->p->address().toString(newPathStr); + fprintf(stderr, "path %s assigned to flow %llx between this node and %llx appears to be dead, reassigning to path %s\n", + pathStr, currFlow->flowId, this->_id.address().toInt(), newPathStr); } } - } - unsigned int r = _freeRandomByte; - if (numAlivePaths > 0) { - int rf = r % numAlivePaths; - _pathChoiceHist.push(alivePaths[rf]); // Record which path we chose - return _paths[alivePaths[rf]].p; - } - else if(numStalePaths > 0) { - // Resort to trying any non-expired path - int rf = r % numStalePaths; - return _paths[stalePaths[rf]].p; + return currFlow->assignedPath->p; } } /** - * Proportionally allocate traffic according to dynamic path quality measurements + * Proportionally allocate traffic according to dynamic path quality measurements. */ - if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE) { if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { _lastAggregateAllocation = now; computeAggregateAllocation(now); @@ -476,6 +604,13 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) return _paths[bestPath].p; } } + + /** + * Flows are dynamically allocated across paths in proportion to link strength and load. + */ + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_FLOW) { + } + return SharedPtr(); } @@ -676,10 +811,20 @@ inline void Peer::processBackgroundPeerTasks(const int64_t now) _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); _remoteMultipathSupported = _vProto > 9; // If both peers support multipath and more than one path exist, we can use multipath logic - DEBUG_INFO("from=%llx, _localMultipathSupported=%d, _remoteMultipathSupported=%d, (_uniqueAlivePathCount > 1)=%d", - this->_id.address().toInt(), _localMultipathSupported, _remoteMultipathSupported, (_uniqueAlivePathCount > 1)); _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1); } + + // Remove old flows + std::map::iterator it = _flows.begin(); + while (it != _flows.end()) { + if ((now - it->second->lastSend) > ZT_MULTIPATH_FLOW_EXPIRATION) { + fprintf(stderr, "forgetting flow %llx between this node and %llx (%lu active flow(s))\n", + it->first, this->_id.address().toInt(), _flows.size()); + it = _flows.erase(it); + } else { + it++; + } + } } void Peer::sendACK(void *tPtr,const SharedPtr &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) diff --git a/node/Peer.hpp b/node/Peer.hpp index 6d3ce553..7633ad7d 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -28,6 +28,8 @@ #define ZT_PEER_HPP #include +#include +#include #include "../include/ZeroTierOne.h" @@ -147,6 +149,8 @@ public: return false; } + void constructSetOfVirtualPaths(); + /** * Record statistics on outgoing packets * @@ -216,14 +220,17 @@ public: */ int aggregateLinkLogicalPathCount(); + std::vector> getAllPaths(int64_t now); + /** * Get the most appropriate direct path based on current multipath and QoS configuration * * @param now Current time + * @param flowId Session-specific protocol flow identifier used for path allocation * @param includeExpired If true, include even expired paths * @return Best current path or NULL if none */ - SharedPtr getAppropriatePath(int64_t now, bool includeExpired); + SharedPtr getAppropriatePath(int64_t now, bool includeExpired, int64_t flowId = -1); /** * Generate a human-readable string of interface names making up the aggregate link, also include @@ -680,6 +687,44 @@ private: int64_t _lastAggregateAllocation; char _interfaceListStr[256]; // 16 characters * 16 paths in a link + + // + struct LinkPerformanceEntry + { + int64_t packetId; + struct VirtualPath *egressVirtualPath; + struct VirtualPath *ingressVirtualPath; + }; + + // Virtual paths + int _virtualPathCount; + Mutex _virtual_paths_m; + struct VirtualPath + { + SharedPtr p; + int64_t localSocket; + std::queue performanceEntries; + }; + std::vector _virtualPaths; + + // Flows + struct Flow + { + Flow(int64_t fid, int64_t ls) : + flowId(fid), + lastSend(ls), + assignedPath(NULL) + {} + + int64_t flowId; + int64_t bytesPerSecond; + int64_t lastSend; + struct VirtualPath *assignedPath; + }; + + std::map _flows; + + int16_t _roundRobinPathAssignmentIdx; }; } // namespace ZeroTier diff --git a/node/Switch.cpp b/node/Switch.cpp index a6852d9f..c2251f23 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -255,6 +255,35 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre } catch ( ... ) {} // sanity check, should be caught elsewhere } +// Returns true if packet appears valid; pos and proto will be set +static bool _ipv6GetPayload(const uint8_t *frameData,unsigned int frameLen,unsigned int &pos,unsigned int &proto) +{ + if (frameLen < 40) + return false; + pos = 40; + proto = frameData[6]; + while (pos <= frameLen) { + switch(proto) { + case 0: // hop-by-hop options + case 43: // routing + case 60: // destination options + case 135: // mobility options + if ((pos + 8) > frameLen) + return false; // invalid! + proto = frameData[pos]; + pos += ((unsigned int)frameData[pos + 1] * 8) + 8; + break; + + //case 44: // fragment -- we currently can't parse these and they are deprecated in IPv6 anyway + //case 50: + //case 51: // IPSec ESP and AH -- we have to stop here since this is encrypted stuff + default: + return true; + } + } + return false; // overflow == invalid +} + void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { if (!network->hasConfig()) @@ -271,6 +300,73 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET; + /* A pseudo-unique identifier used by the balancing and bonding policies to associate properties + * of a specific protocol flow over time and to determine which virtual path this packet + * shall be sent out on. This identifier consists of the source port and destination port + * of the encapsulated frame. + * + * A flowId of -1 will indicate that whatever packet we are about transmit has no + * preferred virtual path and will be sent out according to what the multipath logic + * deems appropriate. An example of this would be an ICMP packet. + */ + int64_t flowId = -1; + + if (etherType == ZT_ETHERTYPE_IPV4 && (len >= 20)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + int8_t proto = (reinterpret_cast(data)[9]); + const unsigned int headerLen = 4 * (reinterpret_cast(data)[0] & 0xf); + switch(proto) { + case 0x01: // ICMP + flowId = 0x01; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (len > (headerLen + 4)) { + unsigned int pos = headerLen + 0; + srcPort = (reinterpret_cast(data)[pos++]) << 8; + srcPort |= (reinterpret_cast(data)[pos]); + pos++; + dstPort = (reinterpret_cast(data)[pos++]) << 8; + dstPort |= (reinterpret_cast(data)[pos]); + flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto; + } + break; + } + } + + if (etherType == ZT_ETHERTYPE_IPV6 && (len >= 40)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + unsigned int pos; + unsigned int proto; + _ipv6GetPayload((const uint8_t *)data, len, pos, proto); + switch(proto) { + case 0x3A: // ICMPv6 + flowId = 0x3A; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (len > (pos + 4)) { + srcPort = (reinterpret_cast(data)[pos++]) << 8; + srcPort |= (reinterpret_cast(data)[pos]); + pos++; + dstPort = (reinterpret_cast(data)[pos++]) << 8; + dstPort |= (reinterpret_cast(data)[pos]); + flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto; + } + break; + default: + break; + } + } + if (to.isMulticast()) { MulticastGroup multicastGroup(to,0); @@ -280,7 +376,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const * otherwise a straightforward Ethernet switch emulation. Vanilla ARP * is dumb old broadcast and simply doesn't scale. ZeroTier multicast * groups have an additional field called ADI (additional distinguishing - * information) which was added specifically for ARP though it could + * information) which was added specifically for ARP though it could * be used for other things too. We then take ARP broadcasts and turn * them into multicasts by stuffing the IP address being queried into * the 32-bit ADI field. In practice this uses our multicast pub/sub @@ -429,7 +525,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - aqm_enqueue(tPtr,network,outp,true,qosBucket); + aqm_enqueue(tPtr,network,outp,true,qosBucket,flowId); } else { Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME); outp.append(network->id()); @@ -437,7 +533,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - aqm_enqueue(tPtr,network,outp,true,qosBucket); + aqm_enqueue(tPtr,network,outp,true,qosBucket,flowId); } } else { // Destination is bridged behind a remote peer @@ -493,7 +589,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - aqm_enqueue(tPtr,network,outp,true,qosBucket); + aqm_enqueue(tPtr,network,outp,true,qosBucket,flowId); } else { RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)"); } @@ -501,10 +597,10 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const } } -void Switch::aqm_enqueue(void *tPtr, const SharedPtr &network, Packet &packet,bool encrypt,int qosBucket) +void Switch::aqm_enqueue(void *tPtr, const SharedPtr &network, Packet &packet,bool encrypt,int qosBucket,int64_t flowId) { if(!network->qosEnabled()) { - send(tPtr, packet, encrypt); + send(tPtr, packet, encrypt, flowId); return; } NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()]; @@ -518,11 +614,9 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr &network, Packet & nqcb->inactiveQueues.push_back(new ManagedQueue(i)); } } - + // Don't apply QoS scheduling to ZT protocol traffic if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) { - // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb()); - // just send packet normally, no QoS for ZT protocol traffic - send(tPtr, packet, encrypt); + send(tPtr, packet, encrypt, flowId); } _aqm_m.lock(); @@ -530,7 +624,7 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr &network, Packet & // Enqueue packet and move queue to appropriate list const Address dest(packet.destination()); - TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt); + TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt,flowId); ManagedQueue *selectedQueue = nullptr; for (size_t i=0; ibyteCredit -= len; // Send the packet! queueAtFrontOfList->q.pop_front(); - send(tPtr, entryToEmit->packet, entryToEmit->encrypt); + send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->flowId); (*nqcb).second->_currEnqueuedPackets--; } if (queueAtFrontOfList) { @@ -734,7 +828,7 @@ void Switch::aqm_dequeue(void *tPtr) queueAtFrontOfList->byteLength -= len; queueAtFrontOfList->byteCredit -= len; queueAtFrontOfList->q.pop_front(); - send(tPtr, entryToEmit->packet, entryToEmit->encrypt); + send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->flowId); (*nqcb).second->_currEnqueuedPackets--; } if (queueAtFrontOfList) { @@ -758,18 +852,18 @@ void Switch::removeNetworkQoSControlBlock(uint64_t nwid) } } -void Switch::send(void *tPtr,Packet &packet,bool encrypt) +void Switch::send(void *tPtr,Packet &packet,bool encrypt,int64_t flowId) { const Address dest(packet.destination()); if (dest == RR->identity.address()) return; - if (!_trySend(tPtr,packet,encrypt)) { + if (!_trySend(tPtr,packet,encrypt,flowId)) { { Mutex::Lock _l(_txQueue_m); if (_txQueue.size() >= ZT_TX_QUEUE_SIZE) { _txQueue.pop_front(); } - _txQueue.push_back(TXQueueEntry(dest,RR->node->now(),packet,encrypt)); + _txQueue.push_back(TXQueueEntry(dest,RR->node->now(),packet,encrypt,flowId)); } if (!RR->topology->getPeer(tPtr,dest)) requestWhois(tPtr,RR->node->now(),dest); @@ -791,10 +885,11 @@ void Switch::requestWhois(void *tPtr,const int64_t now,const Address &addr) const SharedPtr upstream(RR->topology->getUpstreamPeer()); if (upstream) { + int64_t flowId = -1; Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS); addr.appendTo(outp); RR->node->expectReplyTo(outp.packetId()); - send(tPtr,outp,true); + send(tPtr,outp,true,flowId); } } @@ -819,7 +914,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr &peer) Mutex::Lock _l(_txQueue_m); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { if (txi->dest == peer->address()) { - if (_trySend(tPtr,txi->packet,txi->encrypt)) { + if (_trySend(tPtr,txi->packet,txi->encrypt,txi->flowId)) { _txQueue.erase(txi++); } else { ++txi; @@ -843,7 +938,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now) Mutex::Lock _l(_txQueue_m); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { - if (_trySend(tPtr,txi->packet,txi->encrypt)) { + if (_trySend(tPtr,txi->packet,txi->encrypt,txi->flowId)) { _txQueue.erase(txi++); } else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { _txQueue.erase(txi++); @@ -907,7 +1002,7 @@ bool Switch::_shouldUnite(const int64_t now,const Address &source,const Address return false; } -bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) +bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId) { SharedPtr viaPath; const int64_t now = RR->node->now(); @@ -915,54 +1010,73 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) const SharedPtr peer(RR->topology->getPeer(tPtr,destination)); if (peer) { - viaPath = peer->getAppropriatePath(now,false); - if (!viaPath) { - peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known - const SharedPtr relay(RR->topology->getUpstreamPeer()); - if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false))) ) { - if (!(viaPath = peer->getAppropriatePath(now,true))) - return false; + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) { + // Nothing here, we'll grab an entire set of paths to send out on below + } + else { + viaPath = peer->getAppropriatePath(now,false,flowId); + if (!viaPath) { + peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known + const SharedPtr relay(RR->topology->getUpstreamPeer()); + if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false,flowId))) ) { + if (!(viaPath = peer->getAppropriatePath(now,true,flowId))) + return false; + } } } } else { return false; } - unsigned int mtu = ZT_DEFAULT_PHYSMTU; - uint64_t trustedPathId = 0; - RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId); - - unsigned int chunkSize = std::min(packet.size(),mtu); - packet.setFragmented(chunkSize < packet.size()); - - peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now); - - if (trustedPathId) { - packet.setTrusted(trustedPathId); - } else { - packet.armor(peer->key(),encrypt); - } - - if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) { - if (chunkSize < packet.size()) { - // Too big for one packet, fragment the rest - unsigned int fragStart = chunkSize; - unsigned int remaining = packet.size() - chunkSize; - unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)); - if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) - ++fragsRemaining; - const unsigned int totalFragments = fragsRemaining + 1; - - for(unsigned int fno=1;fnosend(RR,tPtr,frag.data(),frag.size(),now); - fragStart += chunkSize; - remaining -= chunkSize; - } + // If sending on all paths, set viaPath to first path + int nextPathIdx = 0; + std::vector> paths = peer->getAllPaths(now); + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) { + if (paths.size()) { + viaPath = paths[nextPathIdx++]; } } + while (viaPath) { + unsigned int mtu = ZT_DEFAULT_PHYSMTU; + uint64_t trustedPathId = 0; + RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId); + unsigned int chunkSize = std::min(packet.size(),mtu); + packet.setFragmented(chunkSize < packet.size()); + peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now); + + if (trustedPathId) { + packet.setTrusted(trustedPathId); + } else { + packet.armor(peer->key(),encrypt); + } + + if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) { + if (chunkSize < packet.size()) { + // Too big for one packet, fragment the rest + unsigned int fragStart = chunkSize; + unsigned int remaining = packet.size() - chunkSize; + unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)); + if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) + ++fragsRemaining; + const unsigned int totalFragments = fragsRemaining + 1; + + for(unsigned int fno=1;fnosend(RR,tPtr,frag.data(),frag.size(),now); + fragStart += chunkSize; + remaining -= chunkSize; + } + } + } + viaPath.zero(); + if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) { + if (paths.size() > nextPathIdx) { + viaPath = paths[nextPathIdx++]; + } + } + } return true; } diff --git a/node/Switch.hpp b/node/Switch.hpp index a531b268..388e1ccf 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -131,7 +131,7 @@ public: * @param encrypt Encrypt packet payload? (always true except for HELLO) * @param qosBucket Which bucket the rule-system determined this packet should fall into */ - void aqm_enqueue(void *tPtr, const SharedPtr &network, Packet &packet,bool encrypt,int qosBucket); + void aqm_enqueue(void *tPtr, const SharedPtr &network, Packet &packet,bool encrypt,int qosBucket,int64_t flowId = -1); /** * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks @@ -177,7 +177,7 @@ public: * @param packet Packet to send (buffer may be modified) * @param encrypt Encrypt packet payload? (always true except for HELLO) */ - void send(void *tPtr,Packet &packet,bool encrypt); + void send(void *tPtr,Packet &packet,bool encrypt,int64_t flowId = -1); /** * Request WHOIS on a given address @@ -212,7 +212,7 @@ public: private: bool _shouldUnite(const int64_t now,const Address &source,const Address &destination); - bool _trySend(void *tPtr,Packet &packet,bool encrypt); // packet is modified if return is true + bool _trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId = -1); // packet is modified if return is true const RuntimeEnvironment *const RR; int64_t _lastBeaconResponse; @@ -261,16 +261,18 @@ private: struct TXQueueEntry { TXQueueEntry() {} - TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc) : + TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc,int64_t fid) : dest(d), creationTime(ct), packet(p), - encrypt(enc) {} + encrypt(enc), + flowId(fid) {} Address dest; uint64_t creationTime; Packet packet; // unencrypted/unMAC'd packet -- this is done at send time bool encrypt; + int64_t flowId; }; std::list< TXQueueEntry > _txQueue; Mutex _txQueue_m; diff --git a/node/Trace.cpp b/node/Trace.cpp index b7c00271..e38aaa41 100644 --- a/node/Trace.cpp +++ b/node/Trace.cpp @@ -109,10 +109,10 @@ void Trace::peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId, void Trace::peerLinkNowAggregate(void *const tPtr,Peer &peer) { - if ((RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM)) { + if ((RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM)) { ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is now a randomly-distributed aggregate link",peer.address().toInt()); } - if ((RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED)) { + if ((RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE)) { ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is now a proportionally-balanced aggregate link",peer.address().toInt()); } }