Minor send path refactor to make packet I/O work on clusters if they are members of networks. Also fix a crash if compiled in cluster mode but no cluster is enabled.
This commit is contained in:
parent
6d5a3cd2e2
commit
5dbebc513a
5 changed files with 278 additions and 115 deletions
205
node/Cluster.cpp
205
node/Cluster.cpp
|
@ -342,17 +342,20 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
|
|||
Identity id;
|
||||
ptr += id.deserialize(dmsg,ptr);
|
||||
if (id) {
|
||||
RR->topology->saveIdentity(id);
|
||||
|
||||
{
|
||||
Mutex::Lock _l(_remotePeers_m);
|
||||
_remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
|
||||
_RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)];
|
||||
if (!rp.lastHavePeerReceived) {
|
||||
RR->topology->saveIdentity(id);
|
||||
id.agree(RR->identity,rp.key,ZT_PEER_SECRET_KEY_LENGTH);
|
||||
}
|
||||
rp.lastHavePeerReceived = RR->node->now();
|
||||
}
|
||||
|
||||
_ClusterSendQueueEntry *q[16384]; // 16384 is "tons"
|
||||
unsigned int qc = _sendQueue->getByDest(id.address(),q,16384);
|
||||
for(unsigned int i=0;i<qc;++i)
|
||||
this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
|
||||
this->relayViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
|
||||
_sendQueue->returnToPool(q,qc);
|
||||
|
||||
TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);
|
||||
|
@ -513,7 +516,79 @@ void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len)
|
|||
}
|
||||
}
|
||||
|
||||
void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
|
||||
int Cluster::prepSendViaCluster(const Address &toPeerAddress,Packet &outp,bool encrypt)
|
||||
{
|
||||
const uint64_t now = RR->node->now();
|
||||
uint64_t mostRecentTs = 0;
|
||||
int mostRecentMemberId = -1;
|
||||
uint8_t mostRecentSecretKey[ZT_PEER_SECRET_KEY_LENGTH];
|
||||
{
|
||||
Mutex::Lock _l2(_remotePeers_m);
|
||||
std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
|
||||
for(;;) {
|
||||
if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
|
||||
break;
|
||||
else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
|
||||
mostRecentTs = rpe->second.lastHavePeerReceived;
|
||||
memcpy(mostRecentSecretKey,rpe->second.key,ZT_PEER_SECRET_KEY_LENGTH);
|
||||
mostRecentMemberId = (int)rpe->first.second;
|
||||
}
|
||||
++rpe;
|
||||
}
|
||||
}
|
||||
|
||||
if (mostRecentMemberId >= 0) {
|
||||
const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
|
||||
if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
|
||||
if (ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)
|
||||
return -1;
|
||||
|
||||
bool sendWantPeer = true;
|
||||
{
|
||||
Mutex::Lock _l(_remotePeers_m);
|
||||
_RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
|
||||
if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
|
||||
rp.lastSentWantPeer = now;
|
||||
} else {
|
||||
sendWantPeer = false; // don't flood WANT_PEER
|
||||
}
|
||||
}
|
||||
if (sendWantPeer) {
|
||||
char tmp[ZT_ADDRESS_LENGTH];
|
||||
toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
|
||||
{
|
||||
Mutex::Lock _l(_memberIds_m);
|
||||
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
|
||||
Mutex::Lock _l2(_members[*mid].lock);
|
||||
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
outp.armor(mostRecentSecretKey,encrypt);
|
||||
return mostRecentMemberId;
|
||||
} else return -1;
|
||||
}
|
||||
|
||||
bool Cluster::sendViaCluster(int mostRecentMemberId,const Address &toPeerAddress,const void *data,unsigned int len)
|
||||
{
|
||||
if ((mostRecentMemberId < 0)||(mostRecentMemberId >= ZT_CLUSTER_MAX_MEMBERS)) // sanity check
|
||||
return false;
|
||||
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
|
||||
for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
|
||||
for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
|
||||
if (i1->ss_family == i2->ss_family) {
|
||||
TRACE("sendViaCluster sending %u bytes to %s by way of %u (%s->%s)",len,toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
|
||||
RR->node->putPacket(*i1,*i2,data,len);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void Cluster::relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
|
||||
{
|
||||
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
|
||||
return;
|
||||
|
@ -521,87 +596,101 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
|
|||
const uint64_t now = RR->node->now();
|
||||
|
||||
uint64_t mostRecentTs = 0;
|
||||
unsigned int mostRecentMemberId = 0xffffffff;
|
||||
int mostRecentMemberId = -1;
|
||||
{
|
||||
Mutex::Lock _l2(_remotePeers_m);
|
||||
std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
|
||||
std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
|
||||
for(;;) {
|
||||
if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
|
||||
break;
|
||||
else if (rpe->second > mostRecentTs) {
|
||||
mostRecentTs = rpe->second;
|
||||
mostRecentMemberId = rpe->first.second;
|
||||
else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
|
||||
mostRecentTs = rpe->second.lastHavePeerReceived;
|
||||
mostRecentMemberId = (int)rpe->first.second;
|
||||
}
|
||||
++rpe;
|
||||
}
|
||||
}
|
||||
|
||||
const uint64_t age = now - mostRecentTs;
|
||||
if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
|
||||
const bool enqueueAndWait = ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff));
|
||||
const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
|
||||
if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
|
||||
// Enqueue and wait if peer seems alive, but do WANT_PEER to refresh homing
|
||||
const bool enqueueAndWait = ((ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId < 0));
|
||||
|
||||
// Poll everyone with WANT_PEER if the age of our most recent entry is
|
||||
// approaching expiration (or has expired, or does not exist).
|
||||
char tmp[ZT_ADDRESS_LENGTH];
|
||||
toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
|
||||
bool sendWantPeer = true;
|
||||
{
|
||||
Mutex::Lock _l(_memberIds_m);
|
||||
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
|
||||
Mutex::Lock _l2(_members[*mid].lock);
|
||||
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
|
||||
Mutex::Lock _l(_remotePeers_m);
|
||||
_RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
|
||||
if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
|
||||
rp.lastSentWantPeer = now;
|
||||
} else {
|
||||
sendWantPeer = false; // don't flood WANT_PEER
|
||||
}
|
||||
}
|
||||
if (sendWantPeer) {
|
||||
char tmp[ZT_ADDRESS_LENGTH];
|
||||
toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
|
||||
{
|
||||
Mutex::Lock _l(_memberIds_m);
|
||||
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
|
||||
Mutex::Lock _l2(_members[*mid].lock);
|
||||
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there isn't a good place to send via, then enqueue this for retrying
|
||||
// later and return after having broadcasted a WANT_PEER.
|
||||
if (enqueueAndWait) {
|
||||
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
|
||||
TRACE("relayViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
|
||||
_sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Buffer<1024> buf;
|
||||
if (unite) {
|
||||
InetAddress v4,v6;
|
||||
if (fromPeerAddress) {
|
||||
SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
|
||||
if (fromPeer)
|
||||
fromPeer->getBestActiveAddresses(now,v4,v6);
|
||||
}
|
||||
uint8_t addrCount = 0;
|
||||
if (v4)
|
||||
++addrCount;
|
||||
if (v6)
|
||||
++addrCount;
|
||||
if (addrCount) {
|
||||
toPeerAddress.appendTo(buf);
|
||||
fromPeerAddress.appendTo(buf);
|
||||
buf.append(addrCount);
|
||||
if (mostRecentMemberId >= 0) {
|
||||
Buffer<1024> buf;
|
||||
if (unite) {
|
||||
InetAddress v4,v6;
|
||||
if (fromPeerAddress) {
|
||||
SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
|
||||
if (fromPeer)
|
||||
fromPeer->getBestActiveAddresses(now,v4,v6);
|
||||
}
|
||||
uint8_t addrCount = 0;
|
||||
if (v4)
|
||||
v4.serialize(buf);
|
||||
++addrCount;
|
||||
if (v6)
|
||||
v6.serialize(buf);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
|
||||
if (buf.size() > 0)
|
||||
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
|
||||
|
||||
for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
|
||||
for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
|
||||
if (i1->ss_family == i2->ss_family) {
|
||||
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
|
||||
RR->node->putPacket(*i1,*i2,data,len);
|
||||
return;
|
||||
}
|
||||
++addrCount;
|
||||
if (addrCount) {
|
||||
toPeerAddress.appendTo(buf);
|
||||
fromPeerAddress.appendTo(buf);
|
||||
buf.append(addrCount);
|
||||
if (v4)
|
||||
v4.serialize(buf);
|
||||
if (v6)
|
||||
v6.serialize(buf);
|
||||
}
|
||||
}
|
||||
|
||||
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
|
||||
return;
|
||||
{
|
||||
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
|
||||
if (buf.size() > 0)
|
||||
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
|
||||
|
||||
for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
|
||||
for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
|
||||
if (i1->ss_family == i2->ss_family) {
|
||||
TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
|
||||
RR->node->putPacket(*i1,*i2,data,len);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -663,8 +752,8 @@ void Cluster::doPeriodicTasks()
|
|||
_lastCleanedRemotePeers = now;
|
||||
|
||||
Mutex::Lock _l(_remotePeers_m);
|
||||
for(std::map< std::pair<Address,unsigned int>,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
|
||||
if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT)
|
||||
for(std::map< std::pair<Address,unsigned int>,_RemotePeer >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
|
||||
if ((now - rp->second.lastHavePeerReceived) >= ZT_PEER_ACTIVITY_TIMEOUT)
|
||||
_remotePeers.erase(rp++);
|
||||
else ++rp;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue