Add Bonds, Slaves, and Flows
This commit is contained in:
parent
de9cfbe9b0
commit
a50e8e9878
31 changed files with 4898 additions and 1966 deletions
337
node/Switch.cpp
337
node/Switch.cpp
|
@ -1,10 +1,10 @@
|
|||
/*
|
||||
* Copyright (c)2019 ZeroTier, Inc.
|
||||
* Copyright (c)2013-2020 ZeroTier, Inc.
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file in the project's root directory.
|
||||
*
|
||||
* Change Date: 2023-01-01
|
||||
* Change Date: 2024-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2.0 of the Apache License.
|
||||
|
@ -42,8 +42,38 @@ Switch::Switch(const RuntimeEnvironment *renv) :
|
|||
{
|
||||
}
|
||||
|
||||
// Returns true if packet appears valid; pos and proto will be set
|
||||
static bool _ipv6GetPayload(const uint8_t *frameData,unsigned int frameLen,unsigned int &pos,unsigned int &proto)
|
||||
{
|
||||
if (frameLen < 40)
|
||||
return false;
|
||||
pos = 40;
|
||||
proto = frameData[6];
|
||||
while (pos <= frameLen) {
|
||||
switch(proto) {
|
||||
case 0: // hop-by-hop options
|
||||
case 43: // routing
|
||||
case 60: // destination options
|
||||
case 135: // mobility options
|
||||
if ((pos + 8) > frameLen)
|
||||
return false; // invalid!
|
||||
proto = frameData[pos];
|
||||
pos += ((unsigned int)frameData[pos + 1] * 8) + 8;
|
||||
break;
|
||||
|
||||
//case 44: // fragment -- we currently can't parse these and they are deprecated in IPv6 anyway
|
||||
//case 50:
|
||||
//case 51: // IPSec ESP and AH -- we have to stop here since this is encrypted stuff
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false; // overflow == invalid
|
||||
}
|
||||
|
||||
void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddress &fromAddr,const void *data,unsigned int len)
|
||||
{
|
||||
int32_t flowId = ZT_QOS_NO_FLOW;
|
||||
try {
|
||||
const int64_t now = RR->node->now();
|
||||
|
||||
|
@ -112,6 +142,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|||
if (rq->packetId != fragmentPacketId) {
|
||||
// No packet found, so we received a fragment without its head.
|
||||
|
||||
rq->flowId = flowId;
|
||||
rq->timestamp = now;
|
||||
rq->packetId = fragmentPacketId;
|
||||
rq->frags[fragmentNumber - 1] = fragment;
|
||||
|
@ -130,7 +161,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|||
for(unsigned int f=1;f<totalFragments;++f)
|
||||
rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
|
||||
|
||||
if (rq->frag0.tryDecode(RR,tPtr)) {
|
||||
if (rq->frag0.tryDecode(RR,tPtr,flowId)) {
|
||||
rq->timestamp = 0; // packet decoded, free entry
|
||||
} else {
|
||||
rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
|
||||
|
@ -195,6 +226,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|||
if (rq->packetId != packetId) {
|
||||
// If we have no other fragments yet, create an entry and save the head
|
||||
|
||||
rq->flowId = flowId;
|
||||
rq->timestamp = now;
|
||||
rq->packetId = packetId;
|
||||
rq->frag0.init(data,len,path,now);
|
||||
|
@ -211,7 +243,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|||
for(unsigned int f=1;f<rq->totalFragments;++f)
|
||||
rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
|
||||
|
||||
if (rq->frag0.tryDecode(RR,tPtr)) {
|
||||
if (rq->frag0.tryDecode(RR,tPtr,flowId)) {
|
||||
rq->timestamp = 0; // packet decoded, free entry
|
||||
} else {
|
||||
rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
|
||||
|
@ -224,9 +256,10 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|||
} else {
|
||||
// Packet is unfragmented, so just process it
|
||||
IncomingPacket packet(data,len,path,now);
|
||||
if (!packet.tryDecode(RR,tPtr)) {
|
||||
if (!packet.tryDecode(RR,tPtr,flowId)) {
|
||||
RXQueueEntry *const rq = _nextRXQueueEntry();
|
||||
Mutex::Lock rql(rq->lock);
|
||||
rq->flowId = flowId;
|
||||
rq->timestamp = now;
|
||||
rq->packetId = packet.packetId();
|
||||
rq->frag0 = packet;
|
||||
|
@ -242,43 +275,6 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|||
} catch ( ... ) {} // sanity check, should be caught elsewhere
|
||||
}
|
||||
|
||||
// Returns true if packet appears valid; pos and proto will be set
|
||||
static bool _ipv6GetPayload(const uint8_t *frameData,unsigned int frameLen,unsigned int &pos,unsigned int &proto)
|
||||
{
|
||||
if (frameLen < 40)
|
||||
return false;
|
||||
pos = 40;
|
||||
proto = frameData[6];
|
||||
while (pos <= frameLen) {
|
||||
switch(proto) {
|
||||
case 0: // hop-by-hop options
|
||||
case 43: // routing
|
||||
case 60: // destination options
|
||||
case 135: // mobility options
|
||||
if ((pos + 8) > frameLen)
|
||||
return false; // invalid!
|
||||
proto = frameData[pos];
|
||||
pos += ((unsigned int)frameData[pos + 1] * 8) + 8;
|
||||
break;
|
||||
|
||||
//case 44: // fragment -- we currently can't parse these and they are deprecated in IPv6 anyway
|
||||
//case 50:
|
||||
//case 51: // IPSec ESP and AH -- we have to stop here since this is encrypted stuff
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false; // overflow == invalid
|
||||
}
|
||||
|
||||
bool Switch::isFlowAware()
|
||||
{
|
||||
int mode = RR->node->getMultipathMode();
|
||||
return (( mode == ZT_MULTIPATH_BALANCE_RR_FLOW)
|
||||
|| (mode == ZT_MULTIPATH_BALANCE_XOR_FLOW)
|
||||
|| (mode == ZT_MULTIPATH_BALANCE_DYNAMIC_FLOW));
|
||||
}
|
||||
|
||||
void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
|
||||
{
|
||||
if (!network->hasConfig())
|
||||
|
@ -293,75 +289,73 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
|
|||
}
|
||||
}
|
||||
|
||||
uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
|
||||
uint8_t qosBucket = ZT_AQM_DEFAULT_BUCKET;
|
||||
|
||||
/* A pseudo-unique identifier used by the balancing and bonding policies to associate properties
|
||||
* of a specific protocol flow over time and to determine which virtual path this packet
|
||||
* shall be sent out on. This identifier consists of the source port and destination port
|
||||
* of the encapsulated frame.
|
||||
/**
|
||||
* A pseudo-unique identifier used by balancing and bonding policies to
|
||||
* categorize individual flows/conversations for assignment to a specific
|
||||
* physical path. This identifier consists of the source port and
|
||||
* destination port of the encapsulated frame.
|
||||
*
|
||||
* A flowId of -1 will indicate that whatever packet we are about transmit has no
|
||||
* preferred virtual path and will be sent out according to what the multipath logic
|
||||
* deems appropriate. An example of this would be an ICMP packet.
|
||||
* A flowId of -1 will indicate that there is no preference for how this
|
||||
* packet shall be sent. An example of this would be an ICMP packet.
|
||||
*/
|
||||
|
||||
int64_t flowId = -1;
|
||||
int32_t flowId = ZT_QOS_NO_FLOW;
|
||||
|
||||
if (isFlowAware()) {
|
||||
if (etherType == ZT_ETHERTYPE_IPV4 && (len >= 20)) {
|
||||
uint16_t srcPort = 0;
|
||||
uint16_t dstPort = 0;
|
||||
int8_t proto = (reinterpret_cast<const uint8_t *>(data)[9]);
|
||||
const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(data)[0] & 0xf);
|
||||
switch(proto) {
|
||||
case 0x01: // ICMP
|
||||
flowId = 0x01;
|
||||
break;
|
||||
// All these start with 16-bit source and destination port in that order
|
||||
case 0x06: // TCP
|
||||
case 0x11: // UDP
|
||||
case 0x84: // SCTP
|
||||
case 0x88: // UDPLite
|
||||
if (len > (headerLen + 4)) {
|
||||
unsigned int pos = headerLen + 0;
|
||||
srcPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
srcPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
pos++;
|
||||
dstPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
dstPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (etherType == ZT_ETHERTYPE_IPV4 && (len >= 20)) {
|
||||
uint16_t srcPort = 0;
|
||||
uint16_t dstPort = 0;
|
||||
uint8_t proto = (reinterpret_cast<const uint8_t *>(data)[9]);
|
||||
const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(data)[0] & 0xf);
|
||||
switch(proto) {
|
||||
case 0x01: // ICMP
|
||||
//flowId = 0x01;
|
||||
break;
|
||||
// All these start with 16-bit source and destination port in that order
|
||||
case 0x06: // TCP
|
||||
case 0x11: // UDP
|
||||
case 0x84: // SCTP
|
||||
case 0x88: // UDPLite
|
||||
if (len > (headerLen + 4)) {
|
||||
unsigned int pos = headerLen + 0;
|
||||
srcPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
srcPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
pos++;
|
||||
dstPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
dstPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
flowId = dstPort ^ srcPort ^ proto;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (etherType == ZT_ETHERTYPE_IPV6 && (len >= 40)) {
|
||||
uint16_t srcPort = 0;
|
||||
uint16_t dstPort = 0;
|
||||
unsigned int pos;
|
||||
unsigned int proto;
|
||||
_ipv6GetPayload((const uint8_t *)data, len, pos, proto);
|
||||
switch(proto) {
|
||||
case 0x3A: // ICMPv6
|
||||
flowId = 0x3A;
|
||||
break;
|
||||
// All these start with 16-bit source and destination port in that order
|
||||
case 0x06: // TCP
|
||||
case 0x11: // UDP
|
||||
case 0x84: // SCTP
|
||||
case 0x88: // UDPLite
|
||||
if (len > (pos + 4)) {
|
||||
srcPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
srcPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
pos++;
|
||||
dstPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
dstPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (etherType == ZT_ETHERTYPE_IPV6 && (len >= 40)) {
|
||||
uint16_t srcPort = 0;
|
||||
uint16_t dstPort = 0;
|
||||
unsigned int pos;
|
||||
unsigned int proto;
|
||||
_ipv6GetPayload((const uint8_t *)data, len, pos, proto);
|
||||
switch(proto) {
|
||||
case 0x3A: // ICMPv6
|
||||
//flowId = 0x3A;
|
||||
break;
|
||||
// All these start with 16-bit source and destination port in that order
|
||||
case 0x06: // TCP
|
||||
case 0x11: // UDP
|
||||
case 0x84: // SCTP
|
||||
case 0x88: // UDPLite
|
||||
if (len > (pos + 4)) {
|
||||
srcPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
srcPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
pos++;
|
||||
dstPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
|
||||
dstPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
|
||||
flowId = dstPort ^ srcPort ^ proto;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -595,7 +589,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
|
|||
}
|
||||
}
|
||||
|
||||
void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket,int64_t flowId)
|
||||
void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket,int32_t flowId)
|
||||
{
|
||||
if(!network->qosEnabled()) {
|
||||
send(tPtr, packet, encrypt, flowId);
|
||||
|
@ -603,18 +597,16 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
|
|||
}
|
||||
NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
|
||||
if (!nqcb) {
|
||||
// DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
|
||||
nqcb = new NetworkQoSControlBlock();
|
||||
_netQueueControlBlock[network->id()] = nqcb;
|
||||
// Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
|
||||
// These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
|
||||
for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
|
||||
for (int i=0; i<ZT_AQM_NUM_BUCKETS; i++) {
|
||||
nqcb->inactiveQueues.push_back(new ManagedQueue(i));
|
||||
}
|
||||
}
|
||||
// Don't apply QoS scheduling to ZT protocol traffic
|
||||
if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
|
||||
// just send packet normally, no QoS for ZT protocol traffic
|
||||
send(tPtr, packet, encrypt, flowId);
|
||||
}
|
||||
|
||||
|
@ -624,8 +616,9 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
|
|||
|
||||
const Address dest(packet.destination());
|
||||
TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt,flowId);
|
||||
|
||||
ManagedQueue *selectedQueue = nullptr;
|
||||
for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
|
||||
for (size_t i=0; i<ZT_AQM_NUM_BUCKETS; i++) {
|
||||
if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
|
||||
if (nqcb->oldQueues[i]->id == qosBucket) {
|
||||
selectedQueue = nqcb->oldQueues[i];
|
||||
|
@ -638,7 +631,7 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
|
|||
if (nqcb->inactiveQueues[i]->id == qosBucket) {
|
||||
selectedQueue = nqcb->inactiveQueues[i];
|
||||
// move queue to end of NEW queue list
|
||||
selectedQueue->byteCredit = ZT_QOS_QUANTUM;
|
||||
selectedQueue->byteCredit = ZT_AQM_QUANTUM;
|
||||
// DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
|
||||
nqcb->newQueues.push_back(selectedQueue);
|
||||
nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
|
||||
|
@ -657,11 +650,11 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
|
|||
|
||||
// Drop a packet if necessary
|
||||
ManagedQueue *selectedQueueToDropFrom = nullptr;
|
||||
if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
|
||||
if (nqcb->_currEnqueuedPackets > ZT_AQM_MAX_ENQUEUED_PACKETS)
|
||||
{
|
||||
// DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
|
||||
int maxQueueLength = 0;
|
||||
for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
|
||||
for (size_t i=0; i<ZT_AQM_NUM_BUCKETS; i++) {
|
||||
if (i < nqcb->oldQueues.size()) {
|
||||
if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
|
||||
maxQueueLength = nqcb->oldQueues[i]->byteLength;
|
||||
|
@ -694,7 +687,7 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
|
|||
|
||||
uint64_t Switch::control_law(uint64_t t, int count)
|
||||
{
|
||||
return (uint64_t)(t + ZT_QOS_INTERVAL / sqrt(count));
|
||||
return (uint64_t)(t + ZT_AQM_INTERVAL / sqrt(count));
|
||||
}
|
||||
|
||||
Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
|
||||
|
@ -708,14 +701,14 @@ Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
|
|||
return r;
|
||||
}
|
||||
uint64_t sojourn_time = now - r.p->creationTime;
|
||||
if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
|
||||
if (sojourn_time < ZT_AQM_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
|
||||
// went below - stay below for at least interval
|
||||
q->first_above_time = 0;
|
||||
} else {
|
||||
if (q->first_above_time == 0) {
|
||||
// just went above from below. if still above at
|
||||
// first_above_time, will say it's ok to drop.
|
||||
q->first_above_time = now + ZT_QOS_INTERVAL;
|
||||
q->first_above_time = now + ZT_AQM_INTERVAL;
|
||||
} else if (now >= q->first_above_time) {
|
||||
r.ok_to_drop = true;
|
||||
}
|
||||
|
@ -747,7 +740,7 @@ Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_
|
|||
q->q.pop_front(); // drop
|
||||
r = dodequeue(q, now);
|
||||
q->dropping = true;
|
||||
q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
|
||||
q->count = (q->count > 2 && now - q->drop_next < 8*ZT_AQM_INTERVAL)?
|
||||
q->count - 2 : 1;
|
||||
q->drop_next = control_law(now, q->count);
|
||||
}
|
||||
|
@ -775,7 +768,7 @@ void Switch::aqm_dequeue(void *tPtr)
|
|||
while (currQueues->size()) {
|
||||
ManagedQueue *queueAtFrontOfList = currQueues->front();
|
||||
if (queueAtFrontOfList->byteCredit < 0) {
|
||||
queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
|
||||
queueAtFrontOfList->byteCredit += ZT_AQM_QUANTUM;
|
||||
// Move to list of OLD queues
|
||||
// DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
|
||||
oldQueues->push_back(queueAtFrontOfList);
|
||||
|
@ -810,7 +803,7 @@ void Switch::aqm_dequeue(void *tPtr)
|
|||
while (currQueues->size()) {
|
||||
ManagedQueue *queueAtFrontOfList = currQueues->front();
|
||||
if (queueAtFrontOfList->byteCredit < 0) {
|
||||
queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
|
||||
queueAtFrontOfList->byteCredit += ZT_AQM_QUANTUM;
|
||||
oldQueues->push_back(queueAtFrontOfList);
|
||||
currQueues->erase(currQueues->begin());
|
||||
} else {
|
||||
|
@ -850,7 +843,7 @@ void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
|
|||
}
|
||||
}
|
||||
|
||||
void Switch::send(void *tPtr,Packet &packet,bool encrypt,int64_t flowId)
|
||||
void Switch::send(void *tPtr,Packet &packet,bool encrypt,int32_t flowId)
|
||||
{
|
||||
const Address dest(packet.destination());
|
||||
if (dest == RR->identity.address())
|
||||
|
@ -883,7 +876,7 @@ void Switch::requestWhois(void *tPtr,const int64_t now,const Address &addr)
|
|||
|
||||
const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer());
|
||||
if (upstream) {
|
||||
int64_t flowId = -1;
|
||||
int32_t flowId = ZT_QOS_NO_FLOW;
|
||||
Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
|
||||
addr.appendTo(outp);
|
||||
RR->node->expectReplyTo(outp.packetId());
|
||||
|
@ -903,7 +896,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
|
|||
RXQueueEntry *const rq = &(_rxQueue[ptr]);
|
||||
Mutex::Lock rql(rq->lock);
|
||||
if ((rq->timestamp)&&(rq->complete)) {
|
||||
if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT))
|
||||
if ((rq->frag0.tryDecode(RR,tPtr,rq->flowId))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT))
|
||||
rq->timestamp = 0;
|
||||
}
|
||||
}
|
||||
|
@ -954,7 +947,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
|
|||
RXQueueEntry *const rq = &(_rxQueue[ptr]);
|
||||
Mutex::Lock rql(rq->lock);
|
||||
if ((rq->timestamp)&&(rq->complete)) {
|
||||
if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
|
||||
if ((rq->frag0.tryDecode(RR,tPtr,rq->flowId))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
|
||||
rq->timestamp = 0;
|
||||
} else {
|
||||
const Address src(rq->frag0.source());
|
||||
|
@ -1000,7 +993,7 @@ bool Switch::_shouldUnite(const int64_t now,const Address &source,const Address
|
|||
return false;
|
||||
}
|
||||
|
||||
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId)
|
||||
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt,int32_t flowId)
|
||||
{
|
||||
SharedPtr<Path> viaPath;
|
||||
const int64_t now = RR->node->now();
|
||||
|
@ -1008,8 +1001,18 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId)
|
|||
|
||||
const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination));
|
||||
if (peer) {
|
||||
if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
|
||||
// Nothing here, we'll grab an entire set of paths to send out on below
|
||||
if ((peer->bondingPolicy() == ZT_BONDING_POLICY_BROADCAST)
|
||||
&& (packet.verb() == Packet::VERB_FRAME || packet.verb() == Packet::VERB_EXT_FRAME)) {
|
||||
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
|
||||
Mutex::Lock _l(peer->_paths_m);
|
||||
for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
|
||||
if (peer->_paths[i].p && peer->_paths[i].p->alive(now)) {
|
||||
char pathStr[128];
|
||||
peer->_paths[i].p->address().toString(pathStr);
|
||||
_sendViaSpecificPath(tPtr,peer,peer->_paths[i].p,now,packet,encrypt,flowId);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
viaPath = peer->getAppropriatePath(now,false,flowId);
|
||||
|
@ -1021,61 +1024,51 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId)
|
|||
return false;
|
||||
}
|
||||
}
|
||||
if (viaPath) {
|
||||
_sendViaSpecificPath(tPtr,peer,viaPath,now,packet,encrypt,flowId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void Switch::_sendViaSpecificPath(void *tPtr,SharedPtr<Peer> peer,SharedPtr<Path> viaPath,int64_t now,Packet &packet,bool encrypt,int32_t flowId)
|
||||
{
|
||||
unsigned int mtu = ZT_DEFAULT_PHYSMTU;
|
||||
uint64_t trustedPathId = 0;
|
||||
RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId);
|
||||
|
||||
unsigned int chunkSize = std::min(packet.size(),mtu);
|
||||
packet.setFragmented(chunkSize < packet.size());
|
||||
|
||||
peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), flowId, now);
|
||||
|
||||
if (trustedPathId) {
|
||||
packet.setTrusted(trustedPathId);
|
||||
} else {
|
||||
return false;
|
||||
packet.armor(peer->key(),encrypt);
|
||||
}
|
||||
|
||||
// If sending on all paths, set viaPath to first path
|
||||
int nextPathIdx = 0;
|
||||
std::vector<SharedPtr<Path>> paths = peer->getAllPaths(now);
|
||||
if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
|
||||
if (paths.size()) {
|
||||
viaPath = paths[nextPathIdx++];
|
||||
}
|
||||
}
|
||||
if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) {
|
||||
if (chunkSize < packet.size()) {
|
||||
// Too big for one packet, fragment the rest
|
||||
unsigned int fragStart = chunkSize;
|
||||
unsigned int remaining = packet.size() - chunkSize;
|
||||
unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
|
||||
if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
|
||||
++fragsRemaining;
|
||||
const unsigned int totalFragments = fragsRemaining + 1;
|
||||
|
||||
while (viaPath) {
|
||||
unsigned int mtu = ZT_DEFAULT_PHYSMTU;
|
||||
uint64_t trustedPathId = 0;
|
||||
RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId);
|
||||
unsigned int chunkSize = std::min(packet.size(),mtu);
|
||||
packet.setFragmented(chunkSize < packet.size());
|
||||
peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
|
||||
|
||||
if (trustedPathId) {
|
||||
packet.setTrusted(trustedPathId);
|
||||
} else {
|
||||
packet.armor(peer->key(),encrypt);
|
||||
}
|
||||
|
||||
if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) {
|
||||
if (chunkSize < packet.size()) {
|
||||
// Too big for one packet, fragment the rest
|
||||
unsigned int fragStart = chunkSize;
|
||||
unsigned int remaining = packet.size() - chunkSize;
|
||||
unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
|
||||
if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
|
||||
++fragsRemaining;
|
||||
const unsigned int totalFragments = fragsRemaining + 1;
|
||||
|
||||
for(unsigned int fno=1;fno<totalFragments;++fno) {
|
||||
chunkSize = std::min(remaining,(unsigned int)(mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
|
||||
Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments);
|
||||
viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
|
||||
fragStart += chunkSize;
|
||||
remaining -= chunkSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
viaPath.zero();
|
||||
if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
|
||||
if (paths.size() > nextPathIdx) {
|
||||
viaPath = paths[nextPathIdx++];
|
||||
for(unsigned int fno=1;fno<totalFragments;++fno) {
|
||||
chunkSize = std::min(remaining,(unsigned int)(mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
|
||||
Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments);
|
||||
viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
|
||||
fragStart += chunkSize;
|
||||
remaining -= chunkSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue