Merge branch 'dev' into cmake

This commit is contained in:
Grant Limberg 2019-08-12 12:23:36 -07:00
commit cc9fd9f8ce
49 changed files with 3129 additions and 1860 deletions

View file

@ -52,6 +52,7 @@ void DB::initNetwork(nlohmann::json &network)
if (!network.count("mtu")) network["mtu"] = ZT_DEFAULT_MTU;
if (!network.count("remoteTraceTarget")) network["remoteTraceTarget"] = nlohmann::json();
if (!network.count("removeTraceLevel")) network["remoteTraceLevel"] = 0;
if (!network.count("rulesSource")) network["rulesSource"] = "";
if (!network.count("rules")) {
// If unspecified, rules are set to allow anything and behave like a flat L2 segment
network["rules"] = {{
@ -104,16 +105,7 @@ void DB::cleanMember(nlohmann::json &member)
member.erase("lastRequestMetaData");
}
DB::DB(const Identity &myId,const char *path) :
_myId(myId),
_myAddress(myId.address()),
_path((path) ? path : "")
{
char tmp[32];
_myAddress.toString(tmp);
_myAddressStr = tmp;
}
DB::DB() {}
DB::~DB() {}
bool DB::get(const uint64_t networkId,nlohmann::json &network)
@ -199,34 +191,15 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohma
return true;
}
bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
_fillSummaryInfo(nw,info);
}
return true;
}
void DB::networks(std::vector<uint64_t> &networks)
void DB::networks(std::set<uint64_t> &networks)
{
waitForReady();
std::lock_guard<std::mutex> l(_networks_l);
networks.reserve(_networks.size() + 1);
for(auto n=_networks.begin();n!=_networks.end();++n)
networks.push_back(n->first);
networks.insert(n->first);
}
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized)
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners)
{
uint64_t memberId = 0;
uint64_t networkId = 0;
@ -310,10 +283,10 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
}
}
if (initialized) {
if (notifyListeners) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig);
(*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
}
}
} else if (memberId) {
@ -333,15 +306,15 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
}
}
if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
if ((notifyListeners)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberDeauthorize(networkId,memberId);
(*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
}
}
}
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized)
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners)
{
if (networkConfig.is_object()) {
const std::string ids = networkConfig["id"];
@ -359,10 +332,10 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
std::lock_guard<std::mutex> l2(nw->lock);
nw->config = networkConfig;
}
if (initialized) {
if (notifyListeners) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkUpdate(networkId,networkConfig);
(*i)->onNetworkUpdate(this,networkId,networkConfig);
}
}
}

View file

@ -27,6 +27,8 @@
#ifndef ZT_CONTROLLER_DB_HPP
#define ZT_CONTROLLER_DB_HPP
//#define ZT_CONTROLLER_USE_LIBPQ
#include "../node/Constants.hpp"
#include "../node/Identity.hpp"
#include "../node/InetAddress.hpp"
@ -41,6 +43,7 @@
#include <vector>
#include <atomic>
#include <mutex>
#include <set>
#include "../ext/json/json.hpp"
@ -58,9 +61,9 @@ public:
public:
ChangeListener() {}
virtual ~ChangeListener() {}
virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {}
virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {}
virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) {}
virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) {}
};
struct NetworkSummaryInfo
@ -78,7 +81,7 @@ public:
static void cleanNetwork(nlohmann::json &network);
static void cleanMember(nlohmann::json &member);
DB(const Identity &myId,const char *path);
DB();
virtual ~DB();
virtual bool waitForReady() = 0;
@ -94,12 +97,27 @@ public:
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
void networks(std::vector<uint64_t> &networks);
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
void networks(std::set<uint64_t> &networks);
template<typename F>
inline void each(F f)
{
nlohmann::json nullJson;
std::lock_guard<std::mutex> lck(_networks_l);
for(auto nw=_networks.begin();nw!=_networks.end();++nw) {
f(nw->first,nw->second->config,0,nullJson); // first provide network with 0 for member ID
for(auto m=nw->second->members.begin();m!=nw->second->members.end();++m) {
f(nw->first,nw->second->config,m->first,m->second);
}
}
}
virtual bool save(nlohmann::json &record,bool notifyListeners) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
inline void addListener(DB::ChangeListener *const listener)
@ -109,6 +127,28 @@ public:
}
protected:
static inline bool _compareRecords(const nlohmann::json &a,const nlohmann::json &b)
{
if (a.is_object() == b.is_object()) {
if (a.is_object()) {
if (a.size() != b.size())
return false;
auto amap = a.get<nlohmann::json::object_t>();
auto bmap = b.get<nlohmann::json::object_t>();
for(auto ai=amap.begin();ai!=amap.end();++ai) {
if (ai->first != "revision") { // ignore revision, compare only non-revision-counter fields
auto bi = bmap.find(ai->first);
if ((bi == bmap.end())||(bi->second != ai->second))
return false;
}
}
return true;
}
return (a == b);
}
return false;
}
struct _Network
{
_Network() : mostRecentDeauthTime(0) {}
@ -121,15 +161,10 @@ protected:
std::mutex lock;
};
void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized);
void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized);
void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners);
void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners);
void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
const Identity _myId;
const Address _myAddress;
const std::string _path;
std::string _myAddressStr;
std::vector<DB::ChangeListener *> _changeListeners;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;

244
controller/DBMirrorSet.cpp Normal file
View file

@ -0,0 +1,244 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#include "DBMirrorSet.hpp"
namespace ZeroTier {
DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
_listener(listener),
_running(true)
{
_syncCheckerThread = std::thread([this]() {
for(;;) {
for(int i=0;i<120;++i) { // 1 minute delay between checks
if (!_running)
return;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
std::vector< std::shared_ptr<DB> > dbs;
{
std::lock_guard<std::mutex> l(_dbs_l);
if (_dbs.size() <= 1)
continue; // no need to do this if there's only one DB, so skip the iteration
dbs = _dbs;
}
for(auto db=dbs.begin();db!=dbs.end();++db) {
(*db)->each([this,&dbs,&db](uint64_t networkId,const nlohmann::json &network,uint64_t memberId,const nlohmann::json &member) {
try {
if (network.is_object()) {
if (memberId == 0) {
for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
if (db->get() != db2->get()) {
nlohmann::json nw2;
if ((!(*db2)->get(networkId,nw2))||((nw2.is_object())&&(OSUtils::jsonInt(nw2["revision"],0) < OSUtils::jsonInt(network["revision"],0)))) {
nw2 = network;
(*db2)->save(nw2,false);
}
}
}
} else if (member.is_object()) {
for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
if (db->get() != db2->get()) {
nlohmann::json nw2,m2;
if ((!(*db2)->get(networkId,nw2,memberId,m2))||((m2.is_object())&&(OSUtils::jsonInt(m2["revision"],0) < OSUtils::jsonInt(member["revision"],0)))) {
m2 = member;
(*db2)->save(m2,false);
}
}
}
}
}
} catch ( ... ) {} // skip entries that generate JSON errors
});
}
}
});
}
DBMirrorSet::~DBMirrorSet()
{
_running = false;
_syncCheckerThread.join();
}
bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->hasNetwork(networkId))
return true;
}
return false;
}
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network)) {
return true;
}
}
return false;
}
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network,memberId,member))
return true;
}
return false;
}
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network,memberId,member,info))
return true;
}
return false;
}
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network,members))
return true;
}
return false;
}
void DBMirrorSet::networks(std::set<uint64_t> &networks)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->networks(networks);
}
}
bool DBMirrorSet::waitForReady()
{
bool r = false;
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
r |= (*d)->waitForReady();
}
return r;
}
bool DBMirrorSet::isReady()
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (!(*d)->isReady())
return false;
}
return true;
}
bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
{
std::vector< std::shared_ptr<DB> > dbs;
{
std::lock_guard<std::mutex> l(_dbs_l);
dbs = _dbs;
}
if (notifyListeners) {
for(auto d=dbs.begin();d!=dbs.end();++d) {
if ((*d)->save(record,true))
return true;
}
return false;
} else {
bool modified = false;
for(auto d=dbs.begin();d!=dbs.end();++d) {
modified |= (*d)->save(record,false);
}
return modified;
}
}
void DBMirrorSet::eraseNetwork(const uint64_t networkId)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->eraseNetwork(networkId);
}
}
void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->eraseMember(networkId,memberId);
}
}
void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->nodeIsOnline(networkId,memberId,physicalAddress);
}
}
void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
{
nlohmann::json record(network);
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (d->get() != db) {
(*d)->save(record,false);
}
}
_listener->onNetworkUpdate(this,networkId,network);
}
void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
nlohmann::json record(member);
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (d->get() != db) {
(*d)->save(record,false);
}
}
_listener->onNetworkMemberUpdate(this,networkId,memberId,member);
}
void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
{
_listener->onNetworkMemberDeauthorize(this,networkId,memberId);
}
} // namespace ZeroTier

View file

@ -0,0 +1,84 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#ifndef ZT_DBMIRRORSET_HPP
#define ZT_DBMIRRORSET_HPP
#include "DB.hpp"
#include <vector>
#include <memory>
#include <mutex>
#include <set>
#include <thread>
namespace ZeroTier {
class DBMirrorSet : public DB::ChangeListener
{
public:
DBMirrorSet(DB::ChangeListener *listener);
virtual ~DBMirrorSet();
bool hasNetwork(const uint64_t networkId) const;
bool get(const uint64_t networkId,nlohmann::json &network);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
void networks(std::set<uint64_t> &networks);
bool waitForReady();
bool isReady();
bool save(nlohmann::json &record,bool notifyListeners);
void eraseNetwork(const uint64_t networkId);
void eraseMember(const uint64_t networkId,const uint64_t memberId);
void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
// These are called by various DB instances when changes occur.
virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
inline void addDB(const std::shared_ptr<DB> &db)
{
db->addListener(this);
std::lock_guard<std::mutex> l(_dbs_l);
_dbs.push_back(db);
}
private:
DB::ChangeListener *const _listener;
std::atomic_bool _running;
std::thread _syncCheckerThread;
std::vector< std::shared_ptr< DB > > _dbs;
mutable std::mutex _dbs_l;
};
} // namespace ZeroTier
#endif

View file

@ -469,12 +469,14 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
} // anonymous namespace
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) :
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *ztPath,const char *dbPath, int listenPort, MQConfig *mqc) :
_startTime(OSUtils::now()),
_listenPort(listenPort),
_node(node),
_ztPath(ztPath),
_path(dbPath),
_sender((NetworkController::Sender *)0),
_db(this),
_mqc(mqc)
{
}
@ -496,12 +498,16 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
#ifdef ZT_CONTROLLER_USE_LIBPQ
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) {
_db.reset(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
_db.addDB(std::shared_ptr<DB>(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc)));
} else {
#endif
_db.addDB(std::shared_ptr<DB>(new FileDB(_path.c_str())));
#ifdef ZT_CONTROLLER_USE_LIBPQ
}
#endif
std::string lfJSON;
OSUtils::readFile((_path + ZT_PATH_SEPARATOR_S ".." ZT_PATH_SEPARATOR_S "local.conf").c_str(),lfJSON);
OSUtils::readFile((_ztPath + ZT_PATH_SEPARATOR_S "local.conf").c_str(),lfJSON);
if (lfJSON.length() > 0) {
nlohmann::json lfConfig(OSUtils::jsonParse(lfJSON));
nlohmann::json &settings = lfConfig["settings"];
@ -521,7 +527,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t ");
if (pubHdrEnd != std::string::npos) {
lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd);
_db.reset(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState));
_db.addDB(std::shared_ptr<DB>(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState)));
}
}
}
@ -529,16 +535,8 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
}
}
}
if (!_db)
_db.reset(new FileDB(_signingId,_path.c_str()));
_db->addListener(this);
#ifdef ZT_CONTROLLER_USE_LIBPQ
}
#endif
_db->waitForReady();
_db.waitForReady();
}
void EmbeddedNetworkController::request(
@ -569,15 +567,12 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
std::string &responseBody,
std::string &responseContentType)
{
if (!_db)
return 500;
if ((path.size() > 0)&&(path[0] == "network")) {
if ((path.size() >= 2)&&(path[1].length() == 16)) {
const uint64_t nwid = Utils::hexStrToU64(path[1].c_str());
json network;
if (!_db->get(nwid,network))
if (!_db.get(nwid,network))
return 404;
if (path.size() >= 3) {
@ -589,7 +584,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
const uint64_t address = Utils::hexStrToU64(path[3].c_str());
json member;
if (!_db->get(nwid,network,address,member))
if (!_db.get(nwid,network,address,member))
return 404;
responseBody = OSUtils::jsonDump(member);
responseContentType = "application/json";
@ -599,7 +594,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
responseBody = "{";
std::vector<json> members;
if (_db->get(nwid,network,members)) {
if (_db.get(nwid,network,members)) {
responseBody.reserve((members.size() + 2) * 32);
std::string mid;
for(auto member=members.begin();member!=members.end();++member) {
@ -628,12 +623,12 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
} else if (path.size() == 1) {
// List networks
std::vector<uint64_t> networkIds;
_db->networks(networkIds);
std::set<uint64_t> networkIds;
_db.networks(networkIds);
char tmp[64];
responseBody = "[";
responseBody.reserve((networkIds.size() + 1) * 24);
for(std::vector<uint64_t>::const_iterator i(networkIds.begin());i!=networkIds.end();++i) {
for(std::set<uint64_t>::const_iterator i(networkIds.begin());i!=networkIds.end();++i) {
if (responseBody.length() > 1)
responseBody.push_back(',');
OSUtils::ztsnprintf(tmp,sizeof(tmp),"\"%.16llx\"",(unsigned long long)*i);
@ -650,7 +645,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
// Controller status
char tmp[4096];
const bool dbOk = _db->isReady();
const bool dbOk = _db.isReady();
OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\n\t\"controller\": true,\n\t\"apiVersion\": %d,\n\t\"clock\": %llu,\n\t\"databaseReady\": %s\n}\n",ZT_NETCONF_CONTROLLER_API_VERSION,(unsigned long long)OSUtils::now(),dbOk ? "true" : "false");
responseBody = tmp;
responseContentType = "application/json";
@ -669,8 +664,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
std::string &responseBody,
std::string &responseContentType)
{
if (!_db)
return 500;
if (path.empty())
return 404;
@ -704,8 +697,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
OSUtils::ztsnprintf(addrs,sizeof(addrs),"%.10llx",(unsigned long long)address);
json member,network;
_db->get(nwid,network,address,member);
json origMember(member); // for detecting changes
_db.get(nwid,network,address,member);
DB::initMember(member);
try {
@ -799,7 +791,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
member["nwid"] = nwids;
DB::cleanMember(member);
_db->save(&origMember,member);
_db.save(member,true);
responseBody = OSUtils::jsonDump(member);
responseContentType = "application/json";
@ -818,7 +810,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix));
uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL);
if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL;
if (!_db->hasNetwork(tryNwid)) {
if (!_db.hasNetwork(tryNwid)) {
nwid = tryNwid;
break;
}
@ -829,8 +821,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)nwid);
json network;
_db->get(nwid,network);
json origNetwork(network); // for detecting changes
_db.get(nwid,network);
DB::initNetwork(network);
try {
@ -1061,7 +1052,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
network["nwid"] = nwids; // legacy
DB::cleanNetwork(network);
_db->save(&origNetwork,network);
_db.save(network,true);
responseBody = OSUtils::jsonDump(network);
responseContentType = "application/json";
@ -1083,8 +1074,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
std::string &responseBody,
std::string &responseContentType)
{
if (!_db)
return 500;
if (path.empty())
return 404;
@ -1096,8 +1085,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
const uint64_t address = Utils::hexStrToU64(path[3].c_str());
json network,member;
_db->get(nwid,network,address,member);
_db->eraseMember(nwid, address);
_db.get(nwid,network,address,member);
_db.eraseMember(nwid, address);
{
std::lock_guard<std::mutex> l(_memberStatus_l);
@ -1112,8 +1101,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
}
} else {
json network;
_db->get(nwid,network);
_db->eraseNetwork(nwid);
_db.get(nwid,network);
_db.eraseNetwork(nwid);
{
std::lock_guard<std::mutex> l(_memberStatus_l);
@ -1143,9 +1132,6 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
char id[128],tmp[128];
std::string k,v;
if (!_db)
return;
try {
// Convert Dictionary into JSON object
json d;
@ -1184,13 +1170,13 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
d["objtype"] = "trace";
d["ts"] = now;
d["nodeId"] = Utils::hex10(rt.origin,tmp);
_db->save((nlohmann::json *)0,d);
_db.save(d,true);
} catch ( ... ) {
// drop invalid trace messages if an error occurs
}
}
void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network)
void EmbeddedNetworkController::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
{
// Send an update to all members of the network that are online
const int64_t now = OSUtils::now();
@ -1201,7 +1187,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const n
}
}
void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member)
void EmbeddedNetworkController::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
// Push update to member if online
try {
@ -1212,7 +1198,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,c
} catch ( ... ) {}
}
void EmbeddedNetworkController::onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId)
void EmbeddedNetworkController::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
{
const int64_t now = OSUtils::now();
Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM);
@ -1235,10 +1221,7 @@ void EmbeddedNetworkController::_request(
{
char nwids[24];
DB::NetworkSummaryInfo ns;
json network,member,origMember;
if (!_db)
return;
json network,member;
if (((!_signingId)||(!_signingId.hasPrivate()))||(_signingId.address().toInt() != (nwid >> 24))||(!_sender))
return;
@ -1253,15 +1236,14 @@ void EmbeddedNetworkController::_request(
ms.lastRequestTime = now;
}
_db->nodeIsOnline(nwid,identity.address().toInt(),fromAddr);
_db.nodeIsOnline(nwid,identity.address().toInt(),fromAddr);
Utils::hex(nwid,nwids);
_db->get(nwid,network,identity.address().toInt(),member,ns);
_db.get(nwid,network,identity.address().toInt(),member,ns);
if ((!network.is_object())||(network.size() == 0)) {
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
return;
}
origMember = member;
const bool newMember = ((!member.is_object())||(member.size() == 0));
DB::initMember(member);
@ -1362,7 +1344,7 @@ void EmbeddedNetworkController::_request(
} else {
// If they are not authorized, STOP!
DB::cleanMember(member);
_db->save(&origMember,member);
_db.save(member,true);
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
return;
}
@ -1734,7 +1716,7 @@ void EmbeddedNetworkController::_request(
}
DB::cleanMember(member);
_db->save(&origMember,member);
_db.save(member,true);
_sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6);
}

View file

@ -51,6 +51,7 @@
#include "../ext/json/json.hpp"
#include "DB.hpp"
#include "DBMirrorSet.hpp"
namespace ZeroTier {
@ -65,7 +66,7 @@ public:
* @param node Parent node
* @param dbPath Database path (file path or database credentials)
*/
EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL);
EmbeddedNetworkController(Node *node,const char *ztPath,const char *dbPath, int listenPort, MQConfig *mqc = NULL);
virtual ~EmbeddedNetworkController();
virtual void init(const Identity &signingId,Sender *sender);
@ -101,9 +102,9 @@ public:
void handleRemoteTrace(const ZT_RemoteTrace &rt);
virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network);
virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member);
virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
private:
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
@ -148,12 +149,13 @@ private:
const int64_t _startTime;
int _listenPort;
Node *const _node;
std::string _ztPath;
std::string _path;
Identity _signingId;
std::string _signingIdAddressString;
NetworkController::Sender *_sender;
std::unique_ptr<DB> _db;
DBMirrorSet _db;
BlockingQueue< _RQEntry * > _queue;
std::vector<std::thread> _threads;

View file

@ -29,11 +29,11 @@
namespace ZeroTier
{
FileDB::FileDB(const Identity &myId,const char *path) :
DB(myId,path),
FileDB::FileDB(const char *path) :
DB(),
_path(path),
_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
_onlineChanged(false),
_running(true)
{
OSUtils::mkdir(_path.c_str());
@ -86,38 +86,37 @@ FileDB::~FileDB()
bool FileDB::waitForReady() { return true; }
bool FileDB::isReady() { return true; }
void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
bool FileDB::save(nlohmann::json &record,bool notifyListeners)
{
char p1[4096],p2[4096],pb[4096];
bool modified = false;
try {
if (orig) {
if (*orig != record) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
}
} else {
record["revision"] = 1;
}
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
if ((!old.is_object())||(!_compareRecords(old,record))) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
_networkChanged(old,record,true);
_networkChanged(old,record,notifyListeners);
modified = true;
}
}
} else if (objtype == "member") {
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
if ((!old.is_object())||(!_compareRecords(old,record))) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) {
@ -127,17 +126,14 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
}
_memberChanged(old,record,true);
_memberChanged(old,record,notifyListeners);
modified = true;
}
}
} else if (objtype == "trace") {
const std::string id = record["id"];
if (id.length() > 0) {
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%s.json",_tracePath.c_str(),id.c_str());
OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1));
}
}
} catch ( ... ) {} // drop invalid records missing fields
return modified;
}
void FileDB::eraseNetwork(const uint64_t networkId)
@ -152,7 +148,6 @@ void FileDB::eraseNetwork(const uint64_t networkId)
_networkChanged(network,nullJson,true);
std::lock_guard<std::mutex> l(this->_online_l);
this->_online.erase(networkId);
this->_onlineChanged = true;
}
void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
@ -166,7 +161,6 @@ void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
_memberChanged(member,nullJson,true);
std::lock_guard<std::mutex> l(this->_online_l);
this->_online[networkId].erase(memberId);
this->_onlineChanged = true;
}
void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
@ -176,7 +170,6 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const
physicalAddress.toString(atmp);
std::lock_guard<std::mutex> l(this->_online_l);
this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
this->_onlineChanged = true;
}
} // namespace ZeroTier

View file

@ -35,23 +35,23 @@ namespace ZeroTier
class FileDB : public DB
{
public:
FileDB(const Identity &myId,const char *path);
FileDB(const char *path);
virtual ~FileDB();
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual bool save(nlohmann::json &record,bool notifyListeners);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
protected:
std::string _path;
std::string _networksPath;
std::string _tracePath;
std::thread _onlineUpdateThread;
std::map< uint64_t,std::map<uint64_t,std::map<int64_t,InetAddress> > > _online;
std::mutex _online_l;
bool _onlineChanged;
bool _running;
};

View file

@ -38,7 +38,7 @@ namespace ZeroTier
{
LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
DB(myId,path),
DB(),
_myId(myId),
_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
@ -53,7 +53,6 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
const uint64_t controllerAddressInt = _myId.address().toInt();
_myId.address().toString(controllerAddress);
std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member");
// LF record masking key is the first 32 bytes of SHA512(controller private key) in hex,
// hiding record values from anything but the controller or someone who has its key.
@ -64,7 +63,7 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
int64_t timeRangeStart = 0;
while (_running) {
while (_running.load()) {
{
std::lock_guard<std::mutex> sl(_state_l);
for(auto ns=_state.begin();ns!=_state.end();++ns) {
@ -79,16 +78,22 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
newrec["OwnerPrivate"] = _lfOwnerPrivate;
newrec["MaskingKey"] = maskingKey;
newrec["PulseIfUnchanged"] = true;
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ns->second.dirty = false;
printf("SET network %.16llx %s\n",ns->first,resp->body.c_str());
try {
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ns->second.dirty = false;
//printf("SET network %.16llx %s\n",ns->first,resp->body.c_str());
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str());
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
} catch (std::exception &e) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (create/update network): %s" ZT_EOL_S,e.what());
} catch ( ... ) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (create/update network): unknown exception" ZT_EOL_S);
}
}
}
@ -125,16 +130,22 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
newrec["MaskingKey"] = maskingKey;
newrec["Timestamp"] = ms->second.lastOnlineTime;
newrec["PulseIfUnchanged"] = true;
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ms->second.lastOnlineDirty = false;
printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
try {
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ms->second.lastOnlineDirty = false;
//printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str());
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
} catch (std::exception &e) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (create/update member online status): %s" ZT_EOL_S,e.what());
} catch ( ... ) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (create/update member online status): unknown exception" ZT_EOL_S);
}
}
@ -144,7 +155,7 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
nlohmann::json newrec,selector0,selector1,selectors;
selector0["Name"] = networksSelectorName;
selector0["Ordinal"] = ns->first;
selector1["Name"] = membersSelectorName;
selector1["Name"] = "member";
selector1["Ordinal"] = ms->first;
selectors.push_back(selector0);
selectors.push_back(selector1);
@ -153,16 +164,22 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
newrec["OwnerPrivate"] = _lfOwnerPrivate;
newrec["MaskingKey"] = maskingKey;
newrec["PulseIfUnchanged"] = true;
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ms->second.dirty = false;
printf("SET member %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
try {
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ms->second.dirty = false;
//printf("SET member %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
} catch (std::exception &e) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (create/update member): %s" ZT_EOL_S,e.what());
} catch ( ... ) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (create/update member): unknown exception" ZT_EOL_S);
}
}
}
@ -170,18 +187,18 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
}
}
{
try {
std::ostringstream query;
query
<< '{'
<< "\"Ranges\":[{"
<< "\"Name\":\"" << networksSelectorName << "\","
<< "\"Range\":[0,18446744073709551615]"
<< "}],"
<< "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
<< "\"MaskingKey\":\"" << maskingKey << "\","
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
<< '}';
query <<
"{"
"\"Ranges\":[{"
"\"Name\":\"" << networksSelectorName << "\","
"\"Range\":[0,18446744073709551615]"
"}],"
"\"TimeRange\":[" << timeRangeStart << ",9223372036854775807],"
"\"MaskingKey\":\"" << maskingKey << "\","
"\"Owners\":[\"" << _lfOwnerPublic << "\"]"
"}";
auto resp = htcli.Post("/query",query.str(),"application/json");
if (resp) {
if (resp->status == 200) {
@ -190,64 +207,66 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
for(std::size_t ri=0;ri<results.size();++ri) {
nlohmann::json &rset = results[ri];
if ((rset.is_array())&&(rset.size() > 0)) {
nlohmann::json &result = rset[0];
if (result.is_object()) {
nlohmann::json &record = result["Record"];
if (record.is_object()) {
const std::string recordValue = result["Value"];
printf("GET network %s\n",recordValue.c_str());
//printf("GET network %s\n",recordValue.c_str());
nlohmann::json network(OSUtils::jsonParse(recordValue));
if (network.is_object()) {
const std::string idstr = network["id"];
const uint64_t id = Utils::hexStrToU64(idstr.c_str());
if ((id >> 24) == controllerAddressInt) { // sanity check
std::lock_guard<std::mutex> sl(_state_l);
_NetworkState &ns = _state[id];
if (!ns.dirty) {
nlohmann::json oldNetwork;
if (get(id,oldNetwork)) {
const uint64_t revision = network["revision"];
const uint64_t prevRevision = oldNetwork["revision"];
if (prevRevision < revision) {
_networkChanged(oldNetwork,network,timeRangeStart > 0);
}
} else {
nlohmann::json nullJson;
_networkChanged(nullJson,network,timeRangeStart > 0);
nlohmann::json oldNetwork;
if ((timeRangeStart > 0)&&(get(id,oldNetwork))) {
const uint64_t revision = network["revision"];
const uint64_t prevRevision = oldNetwork["revision"];
if (prevRevision < revision) {
_networkChanged(oldNetwork,network,timeRangeStart > 0);
}
} else {
nlohmann::json nullJson;
_networkChanged(nullJson,network,timeRangeStart > 0);
}
}
}
}
}
}
}
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str());
fprintf(stderr,"ERROR: LFDB: %d from node (check for network updates): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
} catch (std::exception &e) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (check for network updates): %s" ZT_EOL_S,e.what());
} catch ( ... ) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (check for network updates): unknown exception" ZT_EOL_S);
}
{
try {
std::ostringstream query;
query
<< '{'
<< "\"Ranges\":[{"
<< "\"Name\":\"" << networksSelectorName << "\","
<< "\"Range\":[0,18446744073709551615]"
<< "},{"
<< "\"Name\":\"" << membersSelectorName << "\","
<< "\"Range\":[0,18446744073709551615]"
<< "}],"
<< "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
<< "\"MaskingKey\":\"" << maskingKey << "\","
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
<< '}';
query <<
"{"
"\"Ranges\":[{"
"\"Name\":\"" << networksSelectorName << "\","
"\"Range\":[0,18446744073709551615]"
"},{"
"\"Name\":\"member\","
"\"Range\":[0,18446744073709551615]"
"}],"
"\"TimeRange\":[" << timeRangeStart << ",9223372036854775807],"
"\"MaskingKey\":\"" << maskingKey << "\","
"\"Owners\":[\"" << _lfOwnerPublic << "\"]"
"}";
auto resp = htcli.Post("/query",query.str(),"application/json");
if (resp) {
if (resp->status == 200) {
@ -256,12 +275,13 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
for(std::size_t ri=0;ri<results.size();++ri) {
nlohmann::json &rset = results[ri];
if ((rset.is_array())&&(rset.size() > 0)) {
nlohmann::json &result = rset[0];
if (result.is_object()) {
nlohmann::json &record = result["Record"];
if (record.is_object()) {
const std::string recordValue = result["Value"];
printf("GET member %s\n",recordValue.c_str());
//printf("GET member %s\n",recordValue.c_str());
nlohmann::json member(OSUtils::jsonParse(recordValue));
if (member.is_object()) {
const std::string nwidstr = member["nwid"];
@ -270,17 +290,13 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
const uint64_t id = Utils::hexStrToU64(idstr.c_str());
if ((id)&&((nwid >> 24) == controllerAddressInt)) { // sanity check
std::lock_guard<std::mutex> sl(_state_l);
auto ns = _state.find(nwid);
if ((ns == _state.end())||(!ns->second.members[id].dirty)) {
nlohmann::json network,oldMember;
if (get(nwid,network,id,oldMember)) {
const uint64_t revision = member["revision"];
const uint64_t prevRevision = oldMember["revision"];
if (prevRevision < revision)
_memberChanged(oldMember,member,timeRangeStart > 0);
}
} else {
nlohmann::json network,oldMember;
if ((timeRangeStart > 0)&&(get(nwid,network,id,oldMember))) {
const uint64_t revision = member["revision"];
const uint64_t prevRevision = oldMember["revision"];
if (prevRevision < revision)
_memberChanged(oldMember,member,timeRangeStart > 0);
} else if (hasNetwork(nwid)) {
nlohmann::json nullJson;
_memberChanged(nullJson,member,timeRangeStart > 0);
}
@ -289,24 +305,29 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
}
}
}
}
}
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str());
fprintf(stderr,"ERROR: LFDB: %d from node (check for member updates): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
} catch (std::exception &e) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (check for member updates): %s" ZT_EOL_S,e.what());
} catch ( ... ) {
fprintf(stderr,"ERROR: LFDB: unexpected exception querying node (check for member updates): unknown exception" ZT_EOL_S);
}
timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates
_ready = true;
_ready.store(true);
for(int k=0;k<20;++k) { // 2s delay between queries for remotely modified networks or members
if (!_running)
for(int k=0;k<4;++k) { // 2s delay between queries for remotely modified networks or members
if (!_running.load())
return;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
});
@ -314,45 +335,40 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
LFDB::~LFDB()
{
_running = false;
_running.store(false);
_syncThread.join();
}
bool LFDB::waitForReady()
{
while (!_ready) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
while (!_ready.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return true;
}
bool LFDB::isReady()
{
return (_ready);
return (_ready.load());
}
void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
bool LFDB::save(nlohmann::json &record,bool notifyListeners)
{
if (orig) {
if (*orig != record) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
}
} else {
record["revision"] = 1;
}
bool modified = false;
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
_networkChanged(old,record,true);
if ((!old.is_object())||(!_compareRecords(old,record))) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_networkChanged(old,record,notifyListeners);
{
std::lock_guard<std::mutex> l(_state_l);
_state[nwid].dirty = true;
}
modified = true;
}
}
} else if (objtype == "member") {
@ -361,15 +377,18 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
_memberChanged(old,record,true);
if ((!old.is_object())||(!_compareRecords(old,record))) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_memberChanged(old,record,notifyListeners);
{
std::lock_guard<std::mutex> l(_state_l);
_state[nwid].members[id].dirty = true;
}
modified = true;
}
}
}
return modified;
}
void LFDB::eraseNetwork(const uint64_t networkId)

View file

@ -43,7 +43,7 @@ class LFDB : public DB
{
public:
/**
* @param myId Identity of controller node (with secret)
* @param myId This controller's identity
* @param path Base path for ZeroTier node itself
* @param lfOwnerPrivate LF owner private in PEM format
* @param lfOwnerPublic LF owner public in @base62 format
@ -56,7 +56,7 @@ public:
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual bool save(nlohmann::json &record,bool notifyListeners);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);

View file

@ -24,9 +24,11 @@
* of your own application.
*/
#include "PostgreSQL.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ
#include "PostgreSQL.hpp"
#include "../node/Constants.hpp"
#include "EmbeddedNetworkController.hpp"
#include "RabbitMQ.hpp"
#include "../version.h"
@ -37,6 +39,7 @@
#include <amqp_tcp_socket.h>
using json = nlohmann::json;
namespace {
static const int DB_MINIMUM_VERSION = 5;
@ -58,6 +61,7 @@ static const char *_timestr()
return ts;
}
/*
std::string join(const std::vector<std::string> &elements, const char * const separator)
{
switch(elements.size()) {
@ -72,21 +76,26 @@ std::string join(const std::vector<std::string> &elements, const char * const se
return os.str();
}
}
*/
}
} // anonymous namespace
using namespace ZeroTier;
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(myId, path)
, _ready(0)
: DB()
, _myId(myId)
, _myAddress(myId.address())
, _ready(0)
, _connected(1)
, _run(1)
, _waitNoticePrinted(false)
, _run(1)
, _waitNoticePrinted(false)
, _listenPort(listenPort)
, _mqc(mqc)
{
_connString = std::string(path) + " application_name=controller_" +_myAddressStr;
char myAddress[64];
_myAddressStr = myId.address().toString(myAddress);
_connString = std::string(path) + " application_name=controller_" + _myAddressStr;
// Database Schema Version Check
PGconn *conn = getPgConn();
@ -162,27 +171,43 @@ bool PostgreSQL::isReady()
return ((_ready == 2)&&(_connected));
}
void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
{
bool modified = false;
try {
if (!record.is_object()) {
return;
}
waitForReady();
if (orig) {
if (*orig != record) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
_commitQueue.post(new nlohmann::json(record));
if (!record.is_object())
return false;
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(!_compareRecords(old,record))) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
modified = true;
}
}
} else if (objtype == "member") {
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(!_compareRecords(old,record))) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
modified = true;
}
}
} else {
record["revision"] = 1;
_commitQueue.post(new nlohmann::json(record));
}
} catch (std::exception &e) {
fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
} catch (...) {
fprintf(stderr, "Unknown error on PostgreSQL::save\n");
}
return modified;
}
void PostgreSQL::eraseNetwork(const uint64_t networkId)
@ -190,21 +215,23 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
char tmp2[24];
waitForReady();
Utils::hex(networkId, tmp2);
json *tmp = new json();
(*tmp)["id"] = tmp2;
(*tmp)["objtype"] = "_delete_network";
std::pair<nlohmann::json,bool> tmp;
tmp.first["id"] = tmp2;
tmp.first["objtype"] = "_delete_network";
tmp.second = true;
_commitQueue.post(tmp);
}
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
{
char tmp2[24];
json *tmp = new json();
std::pair<nlohmann::json,bool> tmp;
Utils::hex(networkId, tmp2);
(*tmp)["nwid"] = tmp2;
tmp.first["nwid"] = tmp2;
Utils::hex(memberId, tmp2);
(*tmp)["id"] = tmp2;
(*tmp)["objtype"] = "_delete_member";
tmp.first["id"] = tmp2;
tmp.first["objtype"] = "_delete_member";
tmp.second = true;
_commitQueue.post(tmp);
}
@ -544,7 +571,7 @@ void PostgreSQL::heartbeat()
if (gethostname(hostnameTmp, sizeof(hostnameTmp))!= 0) {
hostnameTmp[0] = (char)0;
} else {
for (int i = 0; i < sizeof(hostnameTmp); ++i) {
for (int i = 0; i < (int)sizeof(hostnameTmp); ++i) {
if ((hostnameTmp[i] == '.')||(hostnameTmp[i] == 0)) {
hostnameTmp[i] = (char)0;
break;
@ -595,8 +622,8 @@ void PostgreSQL::heartbeat()
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
"use_rabbitmq = EXCLUDED.use_rabbitmq",
10, // number of parameters
NULL, // oid field. ignore
10, // number of parameters
NULL, // oid field. ignore
values, // values for substitution
NULL, // lengths in bytes of each value
NULL, // binary?
@ -717,7 +744,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
} catch(...) {
fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
}
}
}
}
@ -834,18 +861,18 @@ void PostgreSQL::commitThread()
exit(1);
}
json *config = nullptr;
while(_commitQueue.get(config)&(_run == 1)) {
if (!config) {
std::pair<nlohmann::json,bool> qitem;
while(_commitQueue.get(qitem)&(_run == 1)) {
if (!qitem.first.is_object()) {
continue;
}
if (PQstatus(conn) == CONNECTION_BAD) {
fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
PQfinish(conn);
delete config;
exit(1);
}
try {
try {
nlohmann::json *config = &(qitem.first);
const std::string objtype = (*config)["objtype"];
if (objtype == "member") {
try {
@ -1000,12 +1027,12 @@ void PostgreSQL::commitThread()
nlohmann::json memOrig;
nlohmann::json memNew(*config);
get(nwidInt, nwOrig, memberidInt, memOrig);
_memberChanged(memOrig, memNew, (this->_ready>=2));
_memberChanged(memOrig, memNew, qitem.second);
} else {
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", (unsigned long long)nwidInt, (unsigned long long)memberidInt);
}
} catch (std::exception &e) {
@ -1020,7 +1047,10 @@ void PostgreSQL::commitThread()
if (!(*config)["remoteTraceTarget"].is_null()) {
remoteTraceTarget = (*config)["remoteTraceTarget"];
}
std::string rulesSource = (*config)["rulesSource"];
std::string rulesSource;
if ((*config)["rulesSource"].is_string()) {
rulesSource = (*config)["rulesSource"];
}
std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1);
std::string now = std::to_string(OSUtils::now());
std::string mtu = std::to_string((int)(*config)["mtu"]);
@ -1052,12 +1082,29 @@ void PostgreSQL::commitThread()
v6mode.c_str(),
};
// This ugly query exists because when we want to mirror networks to/from
// another data store (e.g. FileDB or LFDB) it is possible to get a network
// that doesn't exist in Central's database. This does an upsert and sets
// the owner_id to the "first" global admin in the user DB if the record
// did not previously exist. If the record already exists owner_id is left
// unchanged, so owner_id should be left out of the update clause.
PGresult *res = PQexecParams(conn,
"UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, "
"last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, "
"remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, "
"tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 "
"WHERE id = $1",
"INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
"last_modified, mtu, multicast_limit, name, private, "
"remote_trace_level, remote_trace_target, rules, rules_source, "
"tags, v4_assign_mode, v6_assign_mode) VALUES ("
"$1, TO_TIMESTAMP($5::double precision/1000), "
"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
"$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
"$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) "
"ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
"capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
"last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
"multicast_limit = EXCLUDED.multicast_limit, name = EXCLUDED.name, "
"private = EXCLUDED.private, remote_trace_level = EXCLUDED.remote_trace_level, "
"remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, "
"rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
"v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode",
16,
NULL,
values,
@ -1226,16 +1273,14 @@ void PostgreSQL::commitThread()
get(nwidInt, nwOrig);
_networkChanged(nwOrig, nwNew, true);
_networkChanged(nwOrig, nwNew, qitem.second);
} else {
fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
fprintf(stderr, "Can't notify network changed: %llu\n", (unsigned long long)nwidInt);
}
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
}
} else if (objtype == "trace") {
fprintf(stderr, "ERROR: Trace not yet implemented");
} else if (objtype == "_delete_network") {
try {
std::string networkId = (*config)["nwid"];
@ -1292,8 +1337,6 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
}
delete config;
config = nullptr;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
@ -1315,9 +1358,9 @@ void PostgreSQL::onlineNotificationThread()
}
_connected = 1;
int64_t lastUpdatedNetworkStatus = 0;
//int64_t lastUpdatedNetworkStatus = 0;
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
while (_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
@ -1421,129 +1464,6 @@ void PostgreSQL::onlineNotificationThread()
PQclear(res);
}
const int64_t now = OSUtils::now();
if ((now - lastUpdatedNetworkStatus) > 10000) {
lastUpdatedNetworkStatus = now;
std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;
{
std::lock_guard<std::mutex> l(_networks_l);
for (auto i = _networks.begin(); i != _networks.end(); ++i) {
networks.push_back(*i);
}
}
std::stringstream networkUpdate;
networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES ";
bool nwFirstRun = true;
bool networkAdded = false;
for (auto i = networks.begin(); i != networks.end(); ++i) {
char tmp[64];
Utils::hex(i->first, tmp);
std::string networkId(tmp);
std::vector<std::string> &_notUsed = updateMap[networkId];
(void)_notUsed;
uint64_t authMemberCount = 0;
uint64_t totalMemberCount = 0;
uint64_t onlineMemberCount = 0;
uint64_t bridgeCount = 0;
uint64_t ts = now;
{
std::lock_guard<std::mutex> l2(i->second->lock);
authMemberCount = i->second->authorizedMembers.size();
totalMemberCount = i->second->members.size();
bridgeCount = i->second->activeBridgeMembers.size();
for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) {
auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first));
if (lo != lastOnlineCumulative.end()) {
if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) {
++onlineMemberCount;
} else {
lastOnlineCumulative.erase(lo);
}
}
}
}
const char *nvals[1] = {
networkId.c_str()
};
res = PQexecParams(conn,
"SELECT id FROM ztc_network WHERE id = $1",
1,
NULL,
nvals,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn));
PQclear(res);
continue;
}
int nrows = PQntuples(res);
PQclear(res);
if (nrows == 1) {
std::string bc = std::to_string(bridgeCount);
std::string amc = std::to_string(authMemberCount);
std::string omc = std::to_string(onlineMemberCount);
std::string tmc = std::to_string(totalMemberCount);
std::string timestamp = std::to_string(ts);
if (nwFirstRun) {
nwFirstRun = false;
} else {
networkUpdate << ", ";
}
networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", "
<< "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
networkAdded = true;
} else if (nrows > 1) {
fprintf(stderr, "Number of networks > 1?!?!?");
continue;
} else {
continue;
}
}
networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
<< "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
<< "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified";
if (networkAdded) {
res = PQexec(conn, networkUpdate.str().c_str());
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res));
}
PQclear(res);
}
}
// for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
// std::string networkId = it->first;
// std::vector<std::string> members = it->second;
// std::stringstream queryBuilder;
// std::string membersStr = ::join(members, ",");
// queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
// std::string query = queryBuilder.str();
// PGresult *res = PQexec(conn,query.c_str());
// if (PQresultStatus(res) != PGRES_COMMAND_OK) {
// fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
// }
// PQclear(res);
// }
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
@ -1554,7 +1474,8 @@ void PostgreSQL::onlineNotificationThread()
}
}
PGconn *PostgreSQL::getPgConn(OverrideMode m) {
PGconn *PostgreSQL::getPgConn(OverrideMode m)
{
if (m == ALLOW_PGBOUNCER_OVERRIDE) {
char *connStr = getenv("PGBOUNCER_CONNSTR");
if (connStr != NULL) {
@ -1568,4 +1489,5 @@ PGconn *PostgreSQL::getPgConn(OverrideMode m) {
return PQconnectdb(_connString.c_str());
}
#endif //ZT_CONTROLLER_USE_LIBPQ

View file

@ -23,22 +23,21 @@
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#include "DB.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ
#ifndef ZT_CONTROLLER_LIBPQ_HPP
#define ZT_CONTROLLER_LIBPQ_HPP
#include "DB.hpp"
#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
extern "C" {
typedef struct pg_conn PGconn;
typedef struct pg_conn PGconn;
}
namespace ZeroTier
{
namespace ZeroTier {
struct MQConfig;
@ -51,66 +50,69 @@ struct MQConfig;
class PostgreSQL : public DB
{
public:
PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL();
PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL();
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig, nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
virtual bool waitForReady();
virtual bool isReady();
virtual bool save(nlohmann::json &record,bool notifyListeners);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
protected:
struct _PairHasher
struct _PairHasher
{
inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); }
};
private:
void initializeNetworks(PGconn *conn);
void initializeMembers(PGconn *conn);
void heartbeat();
void membersDbWatcher();
void _membersWatcher_Postgres(PGconn *conn);
void _membersWatcher_RabbitMQ();
void networksDbWatcher();
void _networksWatcher_Postgres(PGconn *conn);
void _networksWatcher_RabbitMQ();
void initializeNetworks(PGconn *conn);
void initializeMembers(PGconn *conn);
void heartbeat();
void membersDbWatcher();
void _membersWatcher_Postgres(PGconn *conn);
void _membersWatcher_RabbitMQ();
void networksDbWatcher();
void _networksWatcher_Postgres(PGconn *conn);
void _networksWatcher_RabbitMQ();
void commitThread();
void onlineNotificationThread();
void commitThread();
void onlineNotificationThread();
enum OverrideMode {
ALLOW_PGBOUNCER_OVERRIDE = 0,
NO_OVERRIDE = 1
};
enum OverrideMode {
ALLOW_PGBOUNCER_OVERRIDE = 0,
NO_OVERRIDE = 1
};
PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
std::string _connString;
const Identity _myId;
const Address _myAddress;
std::string _myAddressStr;
std::string _connString;
BlockingQueue<nlohmann::json *> _commitQueue;
BlockingQueue< std::pair<nlohmann::json,bool> > _commitQueue;
std::thread _heartbeatThread;
std::thread _membersDbWatcher;
std::thread _networksDbWatcher;
std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
std::thread _onlineNotificationThread;
std::thread _heartbeatThread;
std::thread _membersDbWatcher;
std::thread _networksDbWatcher;
std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
std::thread _onlineNotificationThread;
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline;
mutable std::mutex _lastOnline_l;
mutable std::mutex _readyLock;
std::atomic<int> _ready, _connected, _run;
mutable volatile bool _waitNoticePrinted;
mutable std::mutex _lastOnline_l;
mutable std::mutex _readyLock;
std::atomic<int> _ready, _connected, _run;
mutable volatile bool _waitNoticePrinted;
int _listenPort;
int _listenPort;
MQConfig *_mqc;
MQConfig *_mqc;
};
}
} // namespace ZeroTier
#endif // ZT_CONTROLLER_LIBPQ_HPP

View file

@ -1,3 +1,30 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#include "RabbitMQ.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ
@ -11,95 +38,95 @@ namespace ZeroTier
{
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
: _mqc(cfg)
, _qName(queueName)
, _socket(NULL)
, _status(0)
: _mqc(cfg)
, _qName(queueName)
, _socket(NULL)
, _status(0)
{
}
RabbitMQ::~RabbitMQ()
{
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(_conn);
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(_conn);
}
void RabbitMQ::init()
{
struct timeval tval;
memset(&tval, 0, sizeof(struct timeval));
tval.tv_sec = 5;
struct timeval tval;
memset(&tval, 0, sizeof(struct timeval));
tval.tv_sec = 5;
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
_conn = amqp_new_connection();
_socket = amqp_tcp_socket_new(_conn);
if (!_socket) {
throw std::runtime_error("Can't create socket for RabbitMQ");
}
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
if (_status) {
throw std::runtime_error("Can't connect to RabbitMQ");
}
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
_mqc->username, _mqc->password);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("RabbitMQ Login Error");
}
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
_conn = amqp_new_connection();
_socket = amqp_tcp_socket_new(_conn);
if (!_socket) {
throw std::runtime_error("Can't create socket for RabbitMQ");
}
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
if (_status) {
throw std::runtime_error("Can't connect to RabbitMQ");
}
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
_mqc->username, _mqc->password);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("RabbitMQ Login Error");
}
static int chan = 0;
static int chan = 0;
{
Mutex::Lock l(_chan_m);
_channel = ++chan;
_channel = ++chan;
}
amqp_channel_open(_conn, _channel);
r = amqp_get_rpc_reply(_conn);
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error opening communication channel");
}
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error declaring queue " + std::string(_qName));
}
amqp_channel_open(_conn, _channel);
r = amqp_get_rpc_reply(_conn);
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error opening communication channel");
}
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error declaring queue " + std::string(_qName));
}
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error consuming queue " + std::string(_qName));
}
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error consuming queue " + std::string(_qName));
}
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
}
std::string RabbitMQ::consume()
{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(_conn);
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(_conn);
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
// timeout waiting for message. Return empty string
return "";
} else {
throw std::runtime_error("Error getting message");
}
}
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
// timeout waiting for message. Return empty string
return "";
} else {
throw std::runtime_error("Error getting message");
}
}
std::string msg(
(const char*)envelope.message.body.bytes,
envelope.message.body.len
);
amqp_destroy_envelope(&envelope);
return msg;
std::string msg(
(const char*)envelope.message.body.bytes,
envelope.message.body.len
);
amqp_destroy_envelope(&envelope);
return msg;
}
}

View file

@ -23,16 +23,19 @@
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#ifndef ZT_CONTROLLER_RABBITMQ_HPP
#define ZT_CONTROLLER_RABBITMQ_HPP
#include "DB.hpp"
namespace ZeroTier
{
struct MQConfig {
const char *host;
int port;
const char *username;
const char *password;
const char *host;
int port;
const char *username;
const char *password;
};
}
@ -49,26 +52,25 @@ namespace ZeroTier
class RabbitMQ {
public:
RabbitMQ(MQConfig *cfg, const char *queueName);
~RabbitMQ();
RabbitMQ(MQConfig *cfg, const char *queueName);
~RabbitMQ();
void init();
void init();
std::string consume();
std::string consume();
private:
MQConfig *_mqc;
const char *_qName;
MQConfig *_mqc;
const char *_qName;
amqp_socket_t *_socket;
amqp_connection_state_t _conn;
amqp_queue_declare_ok_t *_q;
int _status;
amqp_socket_t *_socket;
amqp_connection_state_t _conn;
amqp_queue_declare_ok_t *_q;
int _status;
int _channel;
int _channel;
Mutex _chan_m;
};
}