Add thread PTR that gets passed through the entire ZT core call stack and then passed to handler functions resulting from a call.

This commit is contained in:
Adam Ierymenko 2017-03-27 17:03:17 -07:00
parent 592cac5815
commit e4896b257f
44 changed files with 672 additions and 582 deletions

View file

@ -64,7 +64,7 @@ Switch::Switch(const RuntimeEnvironment *renv) :
{
}
void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
void Switch::onRemotePacket(void *tPtr,const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
{
try {
const uint64_t now = RR->node->now();
@ -81,15 +81,15 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
const Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5);
if (beaconAddr == RR->identity.address())
return;
if (!RR->node->shouldUsePathForZeroTierTraffic(beaconAddr,localAddr,fromAddr))
if (!RR->node->shouldUsePathForZeroTierTraffic(tPtr,beaconAddr,localAddr,fromAddr))
return;
const SharedPtr<Peer> peer(RR->topology->getPeer(beaconAddr));
const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,beaconAddr));
if (peer) { // we'll only respond to beacons from known peers
if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses
_lastBeaconResponse = now;
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP);
outp.armor(peer->key(),true,path->nextOutgoingCounter());
path->send(RR,outp.data(),outp.size(),now);
path->send(RR,tPtr,outp.data(),outp.size(),now);
}
}
@ -115,8 +115,8 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
// Note: we don't bother initiating NAT-t for fragments, since heads will set that off.
// It wouldn't hurt anything, just redundant and unnecessary.
SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
if ((!relayTo)||(!relayTo->sendDirect(fragment.data(),fragment.size(),now,false))) {
SharedPtr<Peer> relayTo = RR->topology->getPeer(tPtr,destination);
if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,false))) {
#ifdef ZT_ENABLE_CLUSTER
if ((RR->cluster)&&(!isClusterFrontplane)) {
RR->cluster->relayViaCluster(Address(),destination,fragment.data(),fragment.size(),false);
@ -127,7 +127,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
// Don't know peer or no direct path -- so relay via someone upstream
relayTo = RR->topology->getUpstreamPeer();
if (relayTo)
relayTo->sendDirect(fragment.data(),fragment.size(),now,true);
relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,true);
}
} else {
TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str());
@ -171,7 +171,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
for(unsigned int f=1;f<totalFragments;++f)
rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
if (rq->frag0.tryDecode(RR)) {
if (rq->frag0.tryDecode(RR,tPtr)) {
rq->timestamp = 0; // packet decoded, free entry
} else {
rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
@ -212,8 +212,8 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
packet.incrementHops();
#endif
SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
if ((relayTo)&&(relayTo->sendDirect(packet.data(),packet.size(),now,false))) {
SharedPtr<Peer> relayTo = RR->topology->getPeer(tPtr,destination);
if ((relayTo)&&(relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,false))) {
if ((source != RR->identity.address())&&(_shouldUnite(now,source,destination))) { // don't send RENDEZVOUS for cluster frontplane relays
const InetAddress *hintToSource = (InetAddress *)0;
const InetAddress *hintToDest = (InetAddress *)0;
@ -222,7 +222,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
InetAddress sourceV4,sourceV6;
relayTo->getRendezvousAddresses(now,destV4,destV6);
const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(source));
const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(tPtr,source));
if (sourcePeer) {
sourcePeer->getRendezvousAddresses(now,sourceV4,sourceV6);
if ((destV6)&&(sourceV6)) {
@ -249,7 +249,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
outp.append((uint8_t)4);
outp.append(hintToSource->rawIpData(),4);
}
send(outp,true);
send(tPtr,outp,true);
} else {
Packet outp(destination,RR->identity.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
@ -262,7 +262,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
outp.append((uint8_t)4);
outp.append(hintToDest->rawIpData(),4);
}
send(outp,true);
send(tPtr,outp,true);
}
++alt;
}
@ -278,7 +278,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
#endif
relayTo = RR->topology->getUpstreamPeer(&source,1,true);
if (relayTo)
relayTo->sendDirect(packet.data(),packet.size(),now,true);
relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true);
}
} else {
TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet.source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str());
@ -321,7 +321,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
for(unsigned int f=1;f<rq->totalFragments;++f)
rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
if (rq->frag0.tryDecode(RR)) {
if (rq->frag0.tryDecode(RR,tPtr)) {
rq->timestamp = 0; // packet decoded, free entry
} else {
rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
@ -334,7 +334,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
} else {
// Packet is unfragmented, so just process it
IncomingPacket packet(data,len,path,now);
if (!packet.tryDecode(RR)) {
if (!packet.tryDecode(RR,tPtr)) {
Mutex::Lock _l(_rxQueue_m);
RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
unsigned long i = ZT_RX_QUEUE_SIZE - 1;
@ -362,7 +362,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
}
}
void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{
if (!network->hasConfig())
return;
@ -474,7 +474,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
adv[42] = (checksum >> 8) & 0xff;
adv[43] = checksum & 0xff;
RR->node->putFrame(network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72);
RR->node->putFrame(tPtr,network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72);
return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query.
} // else no NDP emulation
} // else no NDP emulation
@ -491,17 +491,18 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
* multicast addresses on bridge interfaces and subscribing each slave.
* But in that case this does no harm, as the sets are just merged. */
if (fromBridged)
network->learnBridgedMulticastGroup(multicastGroup,RR->node->now());
network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now());
//TRACE("%.16llx: MULTICAST %s -> %s %s %u",network->id(),from.toString().c_str(),multicastGroup.toString().c_str(),etherTypeName(etherType),len);
// First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates.
if (!network->filterOutgoingPacket(false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType));
return;
}
RR->mc->send(
tPtr,
network->config().multicastLimit,
RR->node->now(),
network->id(),
@ -514,14 +515,14 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
len);
} else if (to == network->mac()) {
// Destination is this node, so just reinject it
RR->node->putFrame(network->id(),network->userPtr(),from,to,etherType,vlanId,data,len);
RR->node->putFrame(tPtr,network->id(),network->userPtr(),from,to,etherType,vlanId,data,len);
} else if (to[0] == MAC::firstOctetForNetwork(network->id())) {
// Destination is another ZeroTier peer on the same network
Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this
SharedPtr<Peer> toPeer(RR->topology->getPeer(toZT));
SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT));
if (!network->filterOutgoingPacket(false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) {
if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) {
TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType));
return;
}
@ -536,7 +537,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
send(outp,true);
send(tPtr,outp,true);
} else {
Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
outp.append(network->id());
@ -544,7 +545,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
send(outp,true);
send(tPtr,outp,true);
}
//TRACE("%.16llx: UNICAST: %s -> %s etherType==%s(%.4x) vlanId==%u len==%u fromBridged==%d includeCom==%d",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType),etherType,vlanId,len,(int)fromBridged,(int)includeCom);
@ -554,7 +555,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
// We filter with a NULL destination ZeroTier address first. Filtrations
// for each ZT destination are also done below. This is the same rationale
// and design as for multicast.
if (!network->filterOutgoingPacket(false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType));
return;
}
@ -592,7 +593,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
}
for(unsigned int b=0;b<numBridges;++b) {
if (network->filterOutgoingPacket(true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) {
if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) {
Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME);
outp.append(network->id());
outp.append((uint8_t)0x00);
@ -602,7 +603,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
send(outp,true);
send(tPtr,outp,true);
} else {
TRACE("%.16llx: %s -> %s %s packet not sent: filterOutgoingPacket() returned false",network->id(),from.toString().c_str(),to.toString().c_str(),etherTypeName(etherType));
}
@ -610,20 +611,20 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
}
}
void Switch::send(Packet &packet,bool encrypt)
void Switch::send(void *tPtr,Packet &packet,bool encrypt)
{
if (packet.destination() == RR->identity.address()) {
TRACE("BUG: caught attempt to send() to self, ignored");
return;
}
if (!_trySend(packet,encrypt)) {
if (!_trySend(tPtr,packet,encrypt)) {
Mutex::Lock _l(_txQueue_m);
_txQueue.push_back(TXQueueEntry(packet.destination(),RR->node->now(),packet,encrypt));
}
}
void Switch::requestWhois(const Address &addr)
void Switch::requestWhois(void *tPtr,const Address &addr)
{
#ifdef ZT_TRACE
if (addr == RR->identity.address()) {
@ -644,10 +645,10 @@ void Switch::requestWhois(const Address &addr)
}
}
if (inserted)
_sendWhoisRequest(addr,(const Address *)0,0);
_sendWhoisRequest(tPtr,addr,(const Address *)0,0);
}
void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
{
{ // cancel pending WHOIS since we now know this peer
Mutex::Lock _l(_outstandingWhoisRequests_m);
@ -660,7 +661,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
while (i) {
RXQueueEntry *rq = &(_rxQueue[--i]);
if ((rq->timestamp)&&(rq->complete)) {
if (rq->frag0.tryDecode(RR))
if (rq->frag0.tryDecode(RR,tPtr))
rq->timestamp = 0;
}
}
@ -670,7 +671,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
Mutex::Lock _l(_txQueue_m);
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (txi->dest == peer->address()) {
if (_trySend(txi->packet,txi->encrypt))
if (_trySend(tPtr,txi->packet,txi->encrypt))
_txQueue.erase(txi++);
else ++txi;
} else ++txi;
@ -678,7 +679,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
}
}
unsigned long Switch::doTimerTasks(uint64_t now)
unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
{
unsigned long nextDelay = 0xffffffff; // ceiling delay, caller will cap to minimum
@ -695,7 +696,7 @@ unsigned long Switch::doTimerTasks(uint64_t now)
_outstandingWhoisRequests.erase(*a);
} else {
r->lastSent = now;
r->peersConsulted[r->retries] = _sendWhoisRequest(*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0);
r->peersConsulted[r->retries] = _sendWhoisRequest(tPtr,*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0);
TRACE("WHOIS %s (retry %u)",a->toString().c_str(),r->retries);
++r->retries;
nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
@ -709,7 +710,7 @@ unsigned long Switch::doTimerTasks(uint64_t now)
{ // Time out TX queue packets that never got WHOIS lookups or other info.
Mutex::Lock _l(_txQueue_m);
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (_trySend(txi->packet,txi->encrypt))
if (_trySend(tPtr,txi->packet,txi->encrypt))
_txQueue.erase(txi++);
else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
TRACE("TX %s -> %s timed out",txi->packet.source().toString().c_str(),txi->packet.destination().toString().c_str());
@ -743,19 +744,19 @@ bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address
return false;
}
Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted)
Address Switch::_sendWhoisRequest(void *tPtr,const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted)
{
SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(peersAlreadyConsulted,numPeersAlreadyConsulted,false));
if (upstream) {
Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
addr.appendTo(outp);
RR->node->expectReplyTo(outp.packetId());
send(outp,true);
send(tPtr,outp,true);
}
return Address();
}
bool Switch::_trySend(Packet &packet,bool encrypt)
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
{
SharedPtr<Path> viaPath;
const uint64_t now = RR->node->now();
@ -769,7 +770,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
clusterMostRecentMemberId = RR->cluster->checkSendViaCluster(destination,clusterMostRecentTs,clusterPeerSecret);
#endif
const SharedPtr<Peer> peer(RR->topology->getPeer(destination));
const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination));
if (peer) {
/* First get the best path, and if it's dead (and this is not a root)
* we attempt to re-activate that path but this packet will flow
@ -784,7 +785,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
if ((clusterMostRecentMemberId < 0)||(viaPath->lastIn() > clusterMostRecentTs)) {
#endif
if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) {
peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now,false,viaPath->nextOutgoingCounter());
peer->attemptToContactAt(tPtr,viaPath->localAddress(),viaPath->address(),now,false,viaPath->nextOutgoingCounter());
viaPath->sent(now);
}
#ifdef ZT_ENABLE_CLUSTER
@ -801,7 +802,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
#else
if (!viaPath) {
#endif
peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known
peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) {
if (!(viaPath = peer->getBestPath(now,true)))
@ -816,7 +817,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
#ifdef ZT_ENABLE_CLUSTER
if (clusterMostRecentMemberId < 0) {
#else
requestWhois(destination);
requestWhois(tPtr,destination);
return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly
#endif
#ifdef ZT_ENABLE_CLUSTER
@ -844,9 +845,9 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
#endif
#ifdef ZT_ENABLE_CLUSTER
if ( ((viaPath)&&(viaPath->send(RR,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) {
if ( ((viaPath)&&(viaPath->send(RR,tPtr,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) {
#else
if (viaPath->send(RR,packet.data(),chunkSize,now)) {
if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) {
#endif
if (chunkSize < packet.size()) {
// Too big for one packet, fragment the rest
@ -866,7 +867,7 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
else if (clusterMostRecentMemberId >= 0)
RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,frag.data(),frag.size());
#else
viaPath->send(RR,frag.data(),frag.size(),now);
viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
#endif
fragStart += chunkSize;
remaining -= chunkSize;