Multicast group join/leave and group membership announcement.

This commit is contained in:
Adam Ierymenko 2015-04-06 18:27:24 -07:00
parent 8001b2c0cb
commit 51f46a009a
6 changed files with 192 additions and 67 deletions

View file

@ -100,20 +100,25 @@ Network::Network(const RuntimeEnvironment *renv,uint64_t nwid) :
ZT1_VirtualNetworkConfig ctmp;
_externalConfig(&ctmp);
_portError = RR->node->configureVirtualNetworkPort(_id,&ctmp);
_portError = RR->node->configureVirtualNetworkPort(_id,ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_UP,&ctmp);
}
Network::~Network()
{
RR->node->configureVirtualNetworkPort(_id,(const ZT1_VirtualNetworkConfig *)0);
ZT1_VirtualNetworkConfig ctmp;
_externalConfig(&ctmp);
char n[128];
if (_destroyed) {
RR->node->configureVirtualNetworkPort(_id,ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY,&ctmp);
Utils::snprintf(n,sizeof(n),"networks.d/%.16llx.conf",_id);
RR->node->dataStoreDelete(n);
Utils::snprintf(n,sizeof(n),"networks.d/%.16llx.mcerts",_id);
RR->node->dataStoreDelete(n);
} else {
RR->node->configureVirtualNetworkPort(_id,ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN,&ctmp);
clean();
std::string buf("ZTMCD0");
@ -132,49 +137,35 @@ Network::~Network()
}
}
// Function object used by rescanMulticastGroups()
class AnnounceMulticastGroupsToPeersWithActiveDirectPaths
void Network::multicastSubscribe(const MulticastGroup &mg)
{
public:
AnnounceMulticastGroupsToPeersWithActiveDirectPaths(const RuntimeEnvironment *renv,Network *nw) :
RR(renv),
_now(Utils::now()),
_network(nw),
_supernodeAddresses(renv->topology->supernodeAddresses())
{}
Mutex::Lock _l(_lock);
if (std::find(_myMulticastGroups.begin(),_myMulticastGroups.end(),mg) != _myMulticastGroups.end())
return;
_myMulticastGroups.push_back(mg);
std::sort(_myMulticastGroups.begin(),_myMulticastGroups.end());
}
inline void operator()(Topology &t,const SharedPtr<Peer> &p)
void Network::multicastUnsubscribe(const MulticastGroup &mg)
{
bool needAnnounce = false;
{
if ( ( (p->hasActiveDirectPath(_now)) && (_network->isAllowed(p->address())) ) || (std::find(_supernodeAddresses.begin(),_supernodeAddresses.end(),p->address()) != _supernodeAddresses.end()) ) {
Packet outp(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
Mutex::Lock _l(_lock);
std::vector<MulticastGroup> mgs(_network->multicastGroups());
for(std::vector<MulticastGroup>::iterator mg(mgs.begin());mg!=mgs.end();++mg) {
if ((outp.size() + 18) > ZT_UDP_DEFAULT_PAYLOAD_MTU) {
outp.armor(p->key(),true);
p->send(RR,outp.data(),outp.size(),_now);
outp.reset(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
}
std::vector<MulticastGroup> nmg;
for(std::vector<MulticastGroup>::const_iterator i(_myMulticastGroups.begin());i!=_myMulticastGroups.end();++i) {
if (*i != mg)
nmg.push_back(*i);
}
// network ID, MAC, ADI
outp.append((uint64_t)_network->id());
mg->mac().appendTo(outp);
outp.append((uint32_t)mg->adi());
}
if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) {
outp.armor(p->key(),true);
p->send(RR,outp.data(),outp.size(),_now);
}
if (nmg.size() != _myMulticastGroups.size()) {
_myMulticastGroups.swap(nmg);
needAnnounce = true;
}
}
private:
const RuntimeEnvironment *RR;
uint64_t _now;
Network *_network;
std::vector<Address> _supernodeAddresses;
};
if (needAnnounce)
_announceMulticastGroups();
}
bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf)
{
@ -189,7 +180,7 @@ bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf)
ZT1_VirtualNetworkConfig ctmp;
_externalConfig(&ctmp);
_portError = RR->node->configureVirtualNetworkPort(_id,&ctmp);
_portError = RR->node->configureVirtualNetworkPort(_id,ZT1_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE,&ctmp);
return true;
} else {
@ -405,6 +396,15 @@ void Network::learnBridgeRoute(const MAC &mac,const Address &addr)
}
}
void Network::learnBridgedMulticastGroup(const MulticastGroup &mg,uint64_t now)
{
Mutex::Lock _l(_lock);
unsigned long tmp = _multicastGroupsBehindMe.size();
_multicastGroupsBehindMe[mg] = now;
if (tmp != _multicastGroupsBehindMe.size())
_announceMulticastGroups();
}
void Network::setEnabled(bool enabled)
{
Mutex::Lock _l(_lock);
@ -467,4 +467,54 @@ void Network::_externalConfig(ZT1_VirtualNetworkConfig *ec) const
} else ec->assignedAddressCount = 0;
}
// Used in Network::_announceMulticastGroups()
class _AnnounceMulticastGroupsToPeersWithActiveDirectPaths
{
public:
_AnnounceMulticastGroupsToPeersWithActiveDirectPaths(const RuntimeEnvironment *renv,Network *nw) :
RR(renv),
_now(Utils::now()),
_network(nw),
_supernodeAddresses(renv->topology->supernodeAddresses())
{}
inline void operator()(Topology &t,const SharedPtr<Peer> &p)
{
if ( ( (p->hasActiveDirectPath(_now)) && (_network->isAllowed(p->address())) ) || (std::find(_supernodeAddresses.begin(),_supernodeAddresses.end(),p->address()) != _supernodeAddresses.end()) ) {
Packet outp(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
std::vector<MulticastGroup> mgs(_network->allMulticastGroups());
for(std::vector<MulticastGroup>::iterator mg(mgs.begin());mg!=mgs.end();++mg) {
if ((outp.size() + 18) > ZT_UDP_DEFAULT_PAYLOAD_MTU) {
outp.armor(p->key(),true);
p->send(RR,outp.data(),outp.size(),_now);
outp.reset(p->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
}
// network ID, MAC, ADI
outp.append((uint64_t)_network->id());
mg->mac().appendTo(outp);
outp.append((uint32_t)mg->adi());
}
if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) {
outp.armor(p->key(),true);
p->send(RR,outp.data(),outp.size(),_now);
}
}
}
private:
const RuntimeEnvironment *RR;
uint64_t _now;
Network *_network;
std::vector<Address> _supernodeAddresses;
};
void Network::_announceMulticastGroups()
{
_AnnounceMulticastGroupsToPeersWithActiveDirectPaths afunc(RR,this);
RR->topology->eachPeer<_AnnounceMulticastGroupsToPeersWithActiveDirectPaths &>(afunc);
}
} // namespace ZeroTier