Multicaster needs to be global, not per-network, and a bunch of other stuff.

This commit is contained in:
Adam Ierymenko 2014-09-30 16:28:25 -07:00
parent 8607aa7c3c
commit 2659427864
10 changed files with 365 additions and 142 deletions

View file

@ -34,6 +34,7 @@
#include "Switch.hpp"
#include "Packet.hpp"
#include "Peer.hpp"
#include "CMWC4096.hpp"
#include "CertificateOfMembership.hpp"
#include "RuntimeEnvironment.hpp"
@ -47,10 +48,75 @@ Multicaster::~Multicaster()
{
}
void Multicaster::send(const RuntimeEnvironment *RR,uint64_t nwid,const CertificateOfMembership *com,unsigned int limit,uint64_t now,const MulticastGroup &mg,const MAC &src,unsigned int etherType,const void *data,unsigned int len)
unsigned int Multicaster::gather(const RuntimeEnvironment *RR,uint64_t nwid,MulticastGroup &mg,Packet &appendTo,unsigned int limit) const
{
unsigned char *p;
unsigned int n = 0,i,rptr;
uint64_t a,done[(ZT_PROTO_MAX_PACKET_LENGTH / 5) + 1];
Mutex::Lock _l(_groups_m);
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::const_iterator gs(_groups.find(std::pair<uint64_t,MulticastGroup>(nwid,mg)));
if ((gs == _groups.end())||(gs->second.members.empty())) {
appendTo.append((uint32_t)0);
appendTo.append((uint16_t)0);
return 0;
}
if (limit > gs->second.members.size())
limit = (unsigned int)gs->second.members.size();
if (limit > 0xffff) // sanity check -- this won't fit in a packet anyway
limit = 0xffff;
appendTo.append((uint32_t)gs->second.members.size());
unsigned int nAt = appendTo.size();
appendTo.append((uint16_t)0); // set to n later
while ((n < limit)&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) {
// Pick a member at random -- if we've already picked it,
// keep circling the buffer until we find one we haven't.
// This won't loop forever since limit <= members.size().
rptr = (unsigned int)RR->prng->next32();
restart_member_scan:
a = gs->second.members[rptr % (unsigned int)gs->second.members.size()].address.toInt();
for(i=0;i<n;++i) {
if (done[i] == a) {
++rptr;
goto restart_member_scan;
}
}
// Log that we've picked this one
done[n++] = a;
// Append to packet
p = (unsigned char *)appendTo.appendField(ZT_ADDRESS_LENGTH);
*(p++) = (unsigned char)((a >> 32) & 0xff);
*(p++) = (unsigned char)((a >> 24) & 0xff);
*(p++) = (unsigned char)((a >> 16) & 0xff);
*(p++) = (unsigned char)((a >> 8) & 0xff);
*p = (unsigned char)(a & 0xff);
}
appendTo.setAt(nAt,(uint16_t)n);
return n;
}
void Multicaster::send(
const RuntimeEnvironment *RR,
const CertificateOfMembership *com,
unsigned int limit,
uint64_t now,
uint64_t nwid,
const MulticastGroup &mg,
const MAC &src,
unsigned int etherType,
const void *data,
unsigned int len)
{
Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[mg];
MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)];
if (gs.members.size() >= limit) {
// If we already have enough members, just send and we're done -- no need for TX queue
@ -95,13 +161,13 @@ void Multicaster::send(const RuntimeEnvironment *RR,uint64_t nwid,const Certific
}
}
void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int limit)
void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now)
{
Mutex::Lock _l(_groups_m);
for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
for(std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
// Remove expired outgoing multicasts from multicast TX queue
for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) {
if ((tx->expired(now))||(tx->sentToCount() >= limit))
if (tx->expired(now))
mm->second.txQueue.erase(tx++);
else ++tx;
}
@ -152,7 +218,7 @@ void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int l
}
}
void Multicaster::_add(const RuntimeEnvironment *RR,uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
{
// assumes _groups_m is locked