More threading improvements in root, more DNS TXT and multicast work (in progress)

This commit is contained in:
Adam Ierymenko 2019-09-16 10:33:59 -07:00
parent 9f9032ae36
commit e08fc81397
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
10 changed files with 317 additions and 95 deletions

View file

@ -107,8 +107,10 @@ using json = nlohmann::json;
#ifdef MSG_DONTWAIT
#define SENDTO_FLAGS MSG_DONTWAIT
#define RECVFROM_FLAGS 0
#else
#define SENDTO_FLAGS 0
#define RECVFROM_FLAGS 0
#endif
//////////////////////////////////////////////////////////////////////////////
@ -911,7 +913,7 @@ int main(int argc,char **argv)
memset(&in6,0,sizeof(in6));
for(;;) {
socklen_t sl = sizeof(in6);
const int pl = (int)recvfrom(s6,pkt.unsafeData(),pkt.capacity(),0,(struct sockaddr *)&in6,&sl);
const int pl = (int)recvfrom(s6,pkt.unsafeData(),pkt.capacity(),RECVFROM_FLAGS,(struct sockaddr *)&in6,&sl);
if (pl > 0) {
if (pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH) {
try {
@ -940,7 +942,7 @@ int main(int argc,char **argv)
memset(&in4,0,sizeof(in4));
for(;;) {
socklen_t sl = sizeof(in4);
const int pl = (int)recvfrom(s4,pkt.unsafeData(),pkt.capacity(),0,(struct sockaddr *)&in4,&sl);
const int pl = (int)recvfrom(s4,pkt.unsafeData(),pkt.capacity(),RECVFROM_FLAGS,(struct sockaddr *)&in4,&sl);
if (pl > 0) {
if (pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH) {
try {
@ -986,43 +988,43 @@ int main(int argc,char **argv)
o << '[';
try {
bool first = true;
std::lock_guard<std::mutex> l(s_peersByIdentity_l);
for(auto p=s_peersByIdentity.begin();p!=s_peersByIdentity.end();++p) {
std::lock_guard<std::mutex> l(s_peers_l);
for(auto p=s_peers.begin();p!=s_peers.end();++p) {
if (first)
first = false;
else o << ',';
o <<
"{\"address\":\"" << p->first.address().toString(tmp) << "\""
"{\"address\":\"" << (*p)->id.address().toString(tmp) << "\""
",\"latency\":-1"
",\"paths\":[";
if (p->second->ip4) {
if ((*p)->ip4) {
o <<
"{\"active\":true"
",\"address\":\"" << p->second->ip4.toIpString(tmp) << "\\/" << p->second->ip4.port() << "\""
",\"address\":\"" << (*p)->ip4.toIpString(tmp) << "\\/" << (*p)->ip4.port() << "\""
",\"expired\":false"
",\"lastReceive\":" << p->second->lastReceive <<
",\"lastSend\":" << p->second->lastSend <<
",\"lastReceive\":" << (*p)->lastReceive <<
",\"lastSend\":" << (*p)->lastSend <<
",\"preferred\":true"
",\"trustedPathId\":0}";
}
if (p->second->ip6) {
if (p->second->ip4)
if ((*p)->ip6) {
if ((*p)->ip4)
o << ',';
o <<
"{\"active\":true"
",\"address\":\"" << p->second->ip6.toIpString(tmp) << "\\/" << p->second->ip6.port() << "\""
",\"address\":\"" << (*p)->ip6.toIpString(tmp) << "\\/" << (*p)->ip6.port() << "\""
",\"expired\":false"
",\"lastReceive\":" << p->second->lastReceive <<
",\"lastSend\":" << p->second->lastSend <<
",\"preferred\":" << ((p->second->ip4) ? "false" : "true") <<
",\"lastReceive\":" << (*p)->lastReceive <<
",\"lastSend\":" << (*p)->lastSend <<
",\"preferred\":" << (((*p)->ip4) ? "false" : "true") <<
",\"trustedPathId\":0}";
}
o << "]"
",\"role\":\"LEAF\""
",\"version\":\"" << p->second->vMajor << '.' << p->second->vMinor << '.' << p->second->vRev << "\""
",\"versionMajor\":" << p->second->vMajor <<
",\"versionMinor\":" << p->second->vMinor <<
",\"versionRev\":" << p->second->vRev << "}";
",\"version\":\"" << (*p)->vMajor << '.' << (*p)->vMinor << '.' << (*p)->vRev << "\""
",\"versionMajor\":" << (*p)->vMajor <<
",\"versionMinor\":" << (*p)->vMinor <<
",\"versionRev\":" << (*p)->vRev << "}";
}
} catch ( ... ) {}
o << ']';
@ -1146,27 +1148,34 @@ int main(int argc,char **argv)
}
// Remove expired peers
{
std::lock_guard<std::mutex> pbi_l(s_peers_l);
for(auto p=s_peers.begin();p!=s_peers.end();) {
if ((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
{
std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
s_peersByIdentity.erase((*p)->id);
}
{
std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
auto pbv = s_peersByVirtAddr.find((*p)->id.address());
if (pbv != s_peersByVirtAddr.end()) {
pbv->second.erase(*p);
if (pbv->second.empty())
s_peersByVirtAddr.erase(pbv);
}
}
s_peers.erase(p++);
} else ++p;
try {
std::vector< SharedPtr<RootPeer> > toRemove;
toRemove.reserve(1024);
{
std::lock_guard<std::mutex> pbi_l(s_peers_l);
for(auto p=s_peers.begin();p!=s_peers.end();) {
if ((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
toRemove.emplace_back(*p);
s_peers.erase(p++);
} else ++p;
}
}
}
for(auto p=toRemove.begin();p!=toRemove.end();++p) {
{
std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
s_peersByIdentity.erase((*p)->id);
}
{
std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
auto pbv = s_peersByVirtAddr.find((*p)->id.address());
if (pbv != s_peersByVirtAddr.end()) {
pbv->second.erase(*p);
if (pbv->second.empty())
s_peersByVirtAddr.erase(pbv);
}
}
}
} catch ( ... ) {}
// Remove old rendezvous entries
{
@ -1212,17 +1221,16 @@ int main(int argc,char **argv)
if (pf) {
std::vector< SharedPtr<RootPeer> > sp;
{
std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
sp.reserve(s_peersByIdentity.size());
for(auto p=s_peersByIdentity.begin();p!=s_peersByIdentity.end();++p) {
sp.push_back(p->second);
std::lock_guard<std::mutex> pbi_l(s_peers_l);
sp.reserve(s_peers.size());
for(auto p=s_peers.begin();p!=s_peers.end();++p) {
sp.emplace_back(*p);
}
}
std::sort(sp.begin(),sp.end(),[](const SharedPtr<RootPeer> &a,const SharedPtr<RootPeer> &b) { return (a->id < b->id); });
fprintf(pf,"Address %21s %45s %10s %6s %10s" ZT_EOL_S,"IPv4","IPv6","Age(sec)","Vers","Fwd(KiB/s)");
{
std::lock_guard<std::mutex> lf_l(s_lastForwardedTo_l);
char ip4[128],ip6[128],ver[128];
for(auto p=sp.begin();p!=sp.end();++p) {
if ((*p)->ip4) {
@ -1239,9 +1247,11 @@ int main(int argc,char **argv)
}
OSUtils::ztsnprintf(ver,sizeof(ver),"%d.%d.%d",(*p)->vMajor,(*p)->vMinor,(*p)->vRev);
double forwardingSpeed = 0.0;
s_lastForwardedTo_l.lock();
auto lft = s_lastForwardedTo.find((*p)->id.address());
if (lft != s_lastForwardedTo.end())
forwardingSpeed = lft->second.bps.perSecond(now) / 1024.0;
s_lastForwardedTo_l.unlock();
fprintf(pf,"%.10llx %21s %45s %10.4f %6s %10.4f" ZT_EOL_S,
(unsigned long long)(*p)->id.address().toInt(),
ip4,
@ -1265,11 +1275,12 @@ int main(int argc,char **argv)
if (sf) {
fprintf(sf,"Uptime (seconds) : %ld" ZT_EOL_S,(long)((now - s_startTime) / 1000));
s_peersByIdentity_l.lock();
fprintf(sf,"Peers : %llu" ZT_EOL_S,(unsigned long long)s_peersByIdentity.size());
s_peersByVirtAddr_l.lock();
fprintf(sf,"Virtual Address Collisions : %llu" ZT_EOL_S,(unsigned long long)(s_peersByIdentity.size() - s_peersByVirtAddr.size()));
s_peersByVirtAddr_l.unlock();
auto peersByIdentitySize = s_peersByIdentity.size();
s_peersByIdentity_l.unlock();
fprintf(sf,"Peers : %llu" ZT_EOL_S,(unsigned long long)peersByIdentitySize);
s_peersByVirtAddr_l.lock();
fprintf(sf,"Virtual Address Collisions : %llu" ZT_EOL_S,(unsigned long long)(peersByIdentitySize - s_peersByVirtAddr.size()));
s_peersByVirtAddr_l.unlock();
s_rendezvousTracking_l.lock();
uint64_t unsuccessfulp2p = 0;
for(auto lr=s_rendezvousTracking.begin();lr!=s_rendezvousTracking.end();++lr) {