Yet more WIP on mulitcast algo...

This commit is contained in:
Adam Ierymenko 2014-09-22 13:18:24 -07:00
parent d9abd4d9be
commit 954f9cbc13
11 changed files with 276 additions and 83 deletions

View file

@ -41,19 +41,64 @@ MulticastTopology::~MulticastTopology()
{
}
void MulticastTopology::clean(const Topology &topology)
void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom)
{
uint64_t now = Utils::now();
std::vector<MulticastGroupMember> &mv = _groups[mg].members;
for(std::vector<MulticastGroupMember>::iterator m(mv.begin());m!=mv.end();++m) {
if (m->address == member) {
if (m->learnedFrom) // once a member has been seen directly, we keep its status as direct
m->learnedFrom = learnedFrom;
m->timestamp = Utils::now();
return;
}
}
mv.push_back(MulticastGroupMember(member,learnedFrom,Utils::now()));
}
for(std::map< MulticastGroup,std::vector<MulticastGroupMember> >::iterator mm(_members.begin());mm!=_members.end();) {
std::vector<MulticastGroupMember>::iterator reader(mm->second.begin());
std::vector<MulticastGroupMember>::iterator writer(mm->second.begin());
unsigned long count = 0;
while (reader != mm->second.end()) {
void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
{
std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg));
if (r != _groups.end()) {
for(std::vector<MulticastGroupMember>::iterator m(r->second.members.begin());m!=r->second.members.end();++m) {
if (m->address == member) {
r->second.members.erase(m);
if (r->second.members.empty())
_groups.erase(r);
return;
}
}
}
}
unsigned int MulticastTopology::want(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn)
{
MulticastGroupStatus &gs = _groups[mg];
if ((unsigned int)gs.members.size() >= limit) {
// We already caught our limit, don't need to go fishing any more.
return 0;
} else {
// Compute the delay between fishing expeditions from the fraction of the limit that we already have.
const uint64_t rateDelay = (uint64_t)ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN + (uint64_t)(((double)gs.members.size() / (double)limit) * (double)(ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX - ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN));
if ((now - gs.lastGatheredMembers) >= rateDelay) {
if (updateLastGatheredTimeOnNonzeroReturn)
gs.lastGatheredMembers = now;
return (limit - (unsigned int)gs.members.size());
} else return 0;
}
}
void MulticastTopology::clean(uint64_t now,const Topology &topology)
{
for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
std::vector<MulticastGroupMember>::iterator reader(mm->second.members.begin());
std::vector<MulticastGroupMember>::iterator writer(mm->second.members.begin());
unsigned int count = 0;
while (reader != mm->second.members.end()) {
if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) {
*writer = *reader;
/* We sort in ascending order of most recent relevant activity. For peers we've learned
/* We rank in ascending order of most recent relevant activity. For peers we've learned
* about by direct LIKEs, we do this in order of their own activity. For indirectly
* acquired peers we do this minus a constant to place these categorically below directly
* learned peers. For peers with no active Peer record, we use the time we last learned
@ -78,10 +123,10 @@ void MulticastTopology::clean(const Topology &topology)
}
if (count) {
mm->second.resize(count);
std::sort(mm->second.begin(),mm->second.end()); // sorts in ascending order of rank
std::sort(mm->second.members.begin(),writer); // sorts in ascending order of rank
mm->second.members.resize(count); // trim off the ones we cut, after writer
++mm;
} else _members.erase(mm++);
} else _groups.erase(mm++);
}
}