Factored out multicast propagation algorithm from Switch and Topology, also cleaned up and clarified it a bit.

This commit is contained in:
Adam Ierymenko 2013-07-10 17:24:27 -04:00
parent 47f611e7b8
commit 9e28bbfbb2
10 changed files with 365 additions and 209 deletions

View file

@ -207,104 +207,6 @@ void Topology::clean()
_peerDeepVerifyJobs_c.signal();
}
void Topology::likesMulticastGroup(uint64_t nwid,const MulticastGroup &mg,const Address &addr,uint64_t now)
{
Mutex::Lock _l(_multicastGroupMembers_m);
_multicastGroupMembers[nwid][mg][addr] = now;
}
struct _PickMulticastPropagationPeersPeerPrioritySortOrder
{
inline bool operator()(const SharedPtr<Peer> &p1,const SharedPtr<Peer> &p2) const
{
return (p1->lastUnicastFrame() >= p2->lastUnicastFrame());
}
};
#define _MAX_PEERS_TO_CONSIDER 256
unsigned int Topology::pickMulticastPropagationPeers(uint64_t nwid,const Address &exclude,const void *propagationBloom,unsigned int propagationBloomSize,unsigned int count,const MulticastGroup &mg,SharedPtr<Peer> *peers)
{
SharedPtr<Peer> possiblePeers[_MAX_PEERS_TO_CONSIDER];
unsigned int numPossiblePeers = 0;
if (count > _MAX_PEERS_TO_CONSIDER)
count = _MAX_PEERS_TO_CONSIDER;
Mutex::Lock _l1(_activePeers_m);
Mutex::Lock _l2(_supernodes_m);
// Grab known non-supernode peers in multicast group, excluding 'exclude'
// Also lazily clean up the _multicastGroupMembers structure
{
Mutex::Lock _l3(_multicastGroupMembers_m);
std::map< uint64_t,std::map< MulticastGroup,std::map< Address,uint64_t > > >::iterator mgm(_multicastGroupMembers.find(nwid));
if (mgm != _multicastGroupMembers.end()) {
std::map< MulticastGroup,std::map< Address,uint64_t > >::iterator g(mgm->second.find(mg));
if (g != mgm->second.end()) {
uint64_t now = Utils::now();
for(std::map< Address,uint64_t >::iterator m(g->second.begin());m!=g->second.end();) {
if (((now - m->second) < ZT_MULTICAST_LIKE_EXPIRE)&&(m->first != exclude)) {
std::map< Address,SharedPtr<Peer> >::const_iterator p(_activePeers.find(m->first));
if (p != _activePeers.end()) {
possiblePeers[numPossiblePeers++] = p->second;
if (numPossiblePeers > _MAX_PEERS_TO_CONSIDER)
break;
}
++m;
} else g->second.erase(m++);
}
if (!g->second.size())
mgm->second.erase(g);
}
}
}
// Sort non-supernode peers in descending order of most recent data
// exchange timestamp. This sorts by implicit social relationships -- who
// you are talking to are the people who get multicasts first.
std::sort(&(possiblePeers[0]),&(possiblePeers[numPossiblePeers]),_PickMulticastPropagationPeersPeerPrioritySortOrder());
// Tack on a supernode peer to the end if we don't have enough regular
// peers, using supernodes to bridge gaps in sparse multicast groups.
if (numPossiblePeers < count) {
SharedPtr<Peer> bestSupernode;
unsigned int bestSupernodeLatency = 0xffff;
for(std::vector< SharedPtr<Peer> >::const_iterator sn(_supernodePeers.begin());sn!=_supernodePeers.end();++sn) {
if (((*sn)->latency())&&((*sn)->latency() < bestSupernodeLatency)) {
bestSupernodeLatency = (*sn)->latency();
bestSupernode = *sn;
}
}
if (bestSupernode)
possiblePeers[numPossiblePeers++] = bestSupernode;
}
unsigned int num = 0;
// First, try to pick peers not in the propgation bloom filter
for(unsigned int i=0;i<numPossiblePeers;++i) {
if (!Utils::bloomContains(propagationBloom,propagationBloomSize,possiblePeers[i]->address().sum())) {
peers[num++] = possiblePeers[i];
if (num >= count)
return num;
}
}
// Next, pick other peers until full (without duplicates)
for(unsigned int i=0;i<numPossiblePeers;++i) {
for(unsigned int j=0;j<num;++j) {
if (peers[j] == possiblePeers[i])
goto check_next_peer;
}
peers[num++] = possiblePeers[i];
if (num >= count)
return num;
check_next_peer:
continue;
}
return num;
}
void Topology::main()
throw()
{
@ -404,14 +306,6 @@ void Topology::main()
}
}
}
{
Mutex::Lock _l(_multicastGroupMembers_m);
for(std::map< uint64_t,std::map< MulticastGroup,std::map< Address,uint64_t > > >::iterator mgm(_multicastGroupMembers.begin());mgm!=_multicastGroupMembers.end();) {
if (_r->nc->hasNetwork(mgm->first))
++mgm;
else _multicastGroupMembers.erase(mgm++);
}
}
break;
case _PeerDeepVerifyJob::EXIT_THREAD:
TRACE("thread terminating...");