Clustering cleanup, still a work in progress.

This commit is contained in:
Adam Ierymenko 2015-11-06 16:12:41 -08:00
parent 5f39d5b7ea
commit 6bc8c9d8ef
5 changed files with 83 additions and 119 deletions

View file

@ -184,6 +184,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
m.z = dmsg.at<int32_t>(ptr); ptr += 4;
ptr += 8; // skip local clock, not used
m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
m.peers = dmsg.at<uint64_t>(ptr); ptr += 8;
ptr += 8; // skip flags, unused
#ifdef ZT_TRACE
std::string addrs;
@ -215,12 +216,22 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
case STATE_MESSAGE_HAVE_PEER: {
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
Mutex::Lock _l2(_peerAffinities_m);
_PA &pa = _peerAffinities[zeroTierAddress];
pa.ts = RR->node->now();
pa.mid = fromMemberId;
_peerAffinities.set(zeroTierAddress,fromMemberId);
TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str());
} break;
case STATE_MESSAGE_WANT_PEER: {
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress));
if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) {
char buf[ZT_ADDRESS_LENGTH];
peer->address().copyTo(buf,ZT_ADDRESS_LENGTH);
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH);
_flush(fromMemberId);
}
} break;
case STATE_MESSAGE_MULTICAST_LIKE: {
const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
@ -311,7 +322,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
if (haveMatch) {
_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
_flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive
_flush(fromMemberId);
RR->sw->send(rendezvousForLocal,true,0);
}
}
@ -349,12 +360,22 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
const uint64_t now = RR->node->now();
unsigned int canHasPeer = 0;
{ // Anyone got this peer?
{
Mutex::Lock _l2(_peerAffinities_m);
_PA *pa = _peerAffinities.get(toPeerAddress);
if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT))
canHasPeer = pa->mid;
else return false;
const unsigned int *pa = _peerAffinities.get(toPeerAddress);
if (!pa) {
char buf[ZT_ADDRESS_LENGTH];
peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH);
}
}
return false;
}
canHasPeer = *pa;
}
Buffer<1024> buf;
@ -395,22 +416,6 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
void Cluster::replicateHavePeer(const Identity &peerId)
{
const uint64_t now = RR->node->now();
{
Mutex::Lock _l2(_peerAffinities_m);
_PA &pa = _peerAffinities[peerId.address()];
if (pa.mid != _id) {
pa.ts = now;
pa.mid = _id;
// fall through to send code below
} else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
return;
} else {
pa.ts = now;
// fall through to send code below
}
}
char buf[ZT_ADDRESS_LENGTH];
peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH);
{
@ -455,44 +460,9 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers
*/
}
struct _ClusterAnnouncePeers
{
_ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {}
const uint64_t now;
Cluster *const parent;
inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const
{
if (peer->hasActiveDirectPath(now))
parent->replicateHavePeer(peer->identity());
}
};
void Cluster::doPeriodicTasks()
{
const uint64_t now = RR->node->now();
// Erase old peer affinity entries just to control table size
if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) {
_lastCleanedPeerAffinities = now;
Address *k = (Address *)0;
_PA *v = (_PA *)0;
Mutex::Lock _l(_peerAffinities_m);
Hashtable< Address,_PA >::Iterator i(_peerAffinities);
while (i.next(k,v)) {
if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5))
_peerAffinities.erase(*k);
}
}
// Announce peers that we have active direct paths to -- note that we forget paths
// that other cluster members claim they have, which prevents us from fighting
// with other cluster members (route flapping) over specific paths.
if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) {
_lastCheckedPeersForAnnounce = now;
_ClusterAnnouncePeers func(now,this);
RR->topology->eachPeer<_ClusterAnnouncePeers &>(func);
}
// Flush outgoing packet send queue every doPeriodicTasks()
if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {
_lastFlushed = now;
Mutex::Lock _l(_memberIds_m);
@ -516,6 +486,7 @@ void Cluster::doPeriodicTasks()
}
alive.append((uint64_t)now);
alive.append((uint64_t)0); // TODO: compute and send load average
alive.append((uint64_t)RR->topology->countActive());
alive.append((uint64_t)0); // unused/reserved flags
alive.append((uint8_t)_zeroTierPhysicalEndpoints.size());
for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe)
@ -630,8 +601,6 @@ void Cluster::status(ZT_ClusterStatus &status) const
{
const uint64_t now = RR->node->now();
memset(&status,0,sizeof(ZT_ClusterStatus));
ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS];
memset(ms,0,sizeof(ms));
status.myId = _id;
@ -641,6 +610,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
ms[_id]->x = _x;
ms[_id]->y = _y;
ms[_id]->z = _z;
ms[_id]->load = 0; // TODO
ms[_id]->peers = RR->topology->countActive();
for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
@ -653,10 +623,11 @@ void Cluster::status(ZT_ClusterStatus &status) const
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
break;
ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]);
_Member &m = _members[*mid];
Mutex::Lock ml(m.lock);
ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]);
s->id = *mid;
s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
@ -664,6 +635,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
s->y = m.y;
s->z = m.z;
s->load = m.load;
s->peers = m.peers;
for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
break;
@ -671,17 +643,6 @@ void Cluster::status(ZT_ClusterStatus &status) const
}
}
}
{
Mutex::Lock _l2(_peerAffinities_m);
Address *k = (Address *)0;
_PA *v = (_PA *)0;
Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities);
while (i.next(k,v)) {
if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) )
++ms[v->mid]->peers;
}
}
}
void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)