Get ephemeral status fields out of the configs. They do not belong there and it just complicates things.

This commit is contained in:
Adam Ierymenko 2017-11-09 17:01:16 -05:00
parent 750e36993f
commit 0fb22df633
4 changed files with 83 additions and 72 deletions

View file

@ -21,6 +21,8 @@
#include "RethinkDB.hpp"
#include "EmbeddedNetworkController.hpp"
#include "../version.h"
#include <chrono>
#include <algorithm>
#include <stdexcept>
@ -216,6 +218,8 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
}
_onlineNotificationThread = std::thread([this]() {
int64_t lastUpdatedNetworkStatus = 0;
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
try {
std::unique_ptr<R::Connection> rdb;
while (_run == 1) {
@ -223,24 +227,77 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
if (!rdb)
rdb = R::connect(this->_host,this->_port,this->_auth);
if (rdb) {
std::lock_guard<std::mutex> l(_lastOnline_l);
R::Array batch;
R::Object tmpobj;
for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) {
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
{
std::lock_guard<std::mutex> l(_lastOnline_l);
lastOnline.swap(_lastOnline);
}
for(auto i=lastOnline.begin();i!=lastOnline.end();++i) {
lastOnlineCumulative[i->first] = i->second.first;
char tmp[64],tmp2[64];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.16llx-%.10llx",i->first.first,i->first.second);
tmpobj["id"] = tmp;
tmpobj["ts"] = i->second.first;
tmpobj["phy"] = i->second.second.toIpString(tmp2);
batch.emplace_back(tmpobj);
if (batch.size() >= 256) {
R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
if (batch.size() >= 1024) {
R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
batch.clear();
}
}
if (batch.size() > 0) {
R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
batch.clear();
}
tmpobj.clear();
const int64_t now = OSUtils::now();
if ((now - lastUpdatedNetworkStatus) > 10000) {
lastUpdatedNetworkStatus = now;
std::vector< std::pair< uint64_t,std::shared_ptr<_Network> > > networks;
{
std::lock_guard<std::mutex> l(_networks_l);
networks.reserve(_networks.size() + 1);
for(auto i=_networks.begin();i!=_networks.end();++i)
networks.push_back(*i);
}
for(auto i=networks.begin();i!=networks.end();++i) {
char tmp[64];
Utils::hex(i->first,tmp);
tmpobj["id"] = tmp;
{
std::lock_guard<std::mutex> l2(i->second->lock);
tmpobj["authorizedMemberCount"] = i->second->authorizedMembers.size();
tmpobj["totalMemberCount"] = i->second->members.size();
unsigned long activeMemberCount = 0;
for(auto m=i->second->members.begin();m!=i->second->members.end();++m) {
auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first,m->first));
if (lo != lastOnlineCumulative.end()) {
if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2))
++activeMemberCount;
else lastOnlineCumulative.erase(lo);
}
}
tmpobj["activeMemberCount"] = activeMemberCount;
tmpobj["bridgeCount"] = i->second->activeBridgeMembers.size();
}
batch.emplace_back(tmpobj);
if (batch.size() >= 1024) {
R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
batch.clear();
}
}
if (batch.size() > 0) {
R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
batch.clear();
}
}
if (batch.size() > 0)
R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
_lastOnline.clear();
}
} catch (std::exception &e) {
fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what());
@ -266,7 +323,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
if (!rdb)
rdb = R::connect(this->_host,this->_port,this->_auth);
if (rdb) {
OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now());
OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld,\"version\":\"%d.%d.%d\"}",this->_myAddressStr.c_str(),(long long)OSUtils::now(),ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION);
//printf("HEARTBEAT: %s" ZT_EOL_S,tmp);
R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb);
}