More work in progress on Switch / PacketDecoder refactor.

This commit is contained in:
Adam Ierymenko 2013-07-11 22:06:25 -04:00
parent fd2b383c3e
commit 339b2314ea
9 changed files with 344 additions and 292 deletions

View file

@ -105,7 +105,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
Multicaster::MulticastBloomFilter newbf;
SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
unsigned int np = _multicaster.pickNextPropagationPeers(
unsigned int np = _r->multicaster->pickNextPropagationPeers(
*(_r->topology),
network->id(),
mg,
@ -324,7 +324,7 @@ unsigned long Switch::doTimerTasks()
{
Mutex::Lock _l(_rxQueue_m);
for(std::multimap< Address,SharedPtr<PacketDecoder> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
for(std::list< SharedPtr<PacketDecoder> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
if ((now - i->second->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) {
TRACE("RX %s -> %s timed out",i->second->source().toString().c_str(),i->second->destination().toString().c_str());
_rxQueue.erase(i++);
@ -392,65 +392,7 @@ void Switch::requestWhois(const Address &addr)
_sendWhoisRequest(addr,(const Address *)0,0);
}
void Switch::_CBaddPeerFromHello(void *arg,const SharedPtr<Peer> &p,Topology::PeerVerifyResult result)
{
_CBaddPeerFromHello_Data *req = (_CBaddPeerFromHello_Data *)arg;
const RuntimeEnvironment *_r = req->parent->_r;
switch(result) {
case Topology::PEER_VERIFY_ACCEPTED_NEW:
case Topology::PEER_VERIFY_ACCEPTED_ALREADY_HAVE:
case Topology::PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS: {
req->parent->_finishWhoisRequest(p);
Packet outp(req->source,_r->identity.address(),Packet::VERB_OK);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(req->helloPacketId);
outp.append(req->helloTimestamp);
outp.encrypt(p->cryptKey());
outp.hmacSet(p->macKey());
req->parent->_r->demarc->send(req->localPort,req->fromAddr,outp.data(),outp.size(),-1);
} break;
case Topology::PEER_VERIFY_REJECTED_INVALID_IDENTITY: {
Packet outp(req->source,_r->identity.address(),Packet::VERB_ERROR);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(req->helloPacketId);
outp.append((unsigned char)Packet::ERROR_IDENTITY_INVALID);
outp.encrypt(p->cryptKey());
outp.hmacSet(p->macKey());
req->parent->_r->demarc->send(req->localPort,req->fromAddr,outp.data(),outp.size(),-1);
} break;
case Topology::PEER_VERIFY_REJECTED_DUPLICATE:
case Topology::PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED: {
Packet outp(req->source,_r->identity.address(),Packet::VERB_ERROR);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(req->helloPacketId);
outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
outp.encrypt(p->cryptKey());
outp.hmacSet(p->macKey());
req->parent->_r->demarc->send(req->localPort,req->fromAddr,outp.data(),outp.size(),-1);
} break;
}
delete req;
}
void Switch::_CBaddPeerFromWhois(void *arg,const SharedPtr<Peer> &p,Topology::PeerVerifyResult result)
{
switch(result) {
case Topology::PEER_VERIFY_ACCEPTED_NEW:
case Topology::PEER_VERIFY_ACCEPTED_ALREADY_HAVE:
case Topology::PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS:
((Switch *)arg)->_finishWhoisRequest(p);
break;
default:
break;
}
}
void Switch::_finishWhoisRequest(const SharedPtr<Peer> &peer)
void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
{
{
Mutex::Lock _l(_outstandingWhoisRequests_m);
@ -459,8 +401,7 @@ void Switch::_finishWhoisRequest(const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_rxQueue_m);
std::pair< std::multimap< Address,SharedPtr<PacketDecoder> >::iterator,std::multimap< Address,SharedPtr<PacketDecoder> >::iterator > waitingRxQueueItems(_rxQueue.equal_range(peer->address()));
for(std::multimap< Address,SharedPtr<PacketDecoder> >::iterator rxi(waitingRxQueueItems.first);rxi!=waitingRxQueueItems.second;) {
for(std::list< SharedPtr<PacketDecoder> >::iterator rxi(_rxQueue.begin());rxi!=rxQueue.end();) {
if (rxi->second->tryDecode(_r))
_rxQueue.erase(rxi++);
else ++rxi;
@ -539,7 +480,7 @@ void Switch::_handleRemotePacketFragment(Demarc::Port localPort,const InetAddres
if (!packet->tryDecode(_r)) {
Mutex::Lock _l(_rxQueue_m);
_rxQueue.insert(std::pair< Address,SharedPtr<PacketDecoder> >(destination,packet));
_rxQueue.push_back(packet);
}
}
} // else this is a duplicate fragment, ignore
@ -596,7 +537,7 @@ void Switch::_handleRemotePacketHead(Demarc::Port localPort,const InetAddress &f
if (!packet->tryDecode(_r)) {
Mutex::Lock _l(_rxQueue_m);
_rxQueue.insert(std::pair< Address,SharedPtr<PacketDecoder> >(destination,packet));
_rxQueue.push_back(packet);
}
} else {
// Still waiting on more fragments, so queue the head
@ -607,7 +548,7 @@ void Switch::_handleRemotePacketHead(Demarc::Port localPort,const InetAddress &f
// Packet is unfragmented, so just process it
if (!packet->tryDecode(_r)) {
Mutex::Lock _l(_rxQueue_m);
_rxQueue.insert(std::pair< Address,SharedPtr<PacketDecoder> >(destination,packet));
_rxQueue.push_back(packet);
}
}
}