Merge dev to edge
This commit is contained in:
commit
f9900cc6fb
50 changed files with 4563 additions and 624 deletions
|
@ -104,20 +104,8 @@ void DB::cleanMember(nlohmann::json &member)
|
|||
member.erase("lastRequestMetaData");
|
||||
}
|
||||
|
||||
DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
|
||||
_controller(nc),
|
||||
_myId(myId),
|
||||
_myAddress(myId.address()),
|
||||
_path((path) ? path : "")
|
||||
{
|
||||
char tmp[32];
|
||||
_myAddress.toString(tmp);
|
||||
_myAddressStr = tmp;
|
||||
}
|
||||
|
||||
DB::~DB()
|
||||
{
|
||||
}
|
||||
DB::DB() {}
|
||||
DB::~DB() {}
|
||||
|
||||
bool DB::get(const uint64_t networkId,nlohmann::json &network)
|
||||
{
|
||||
|
@ -202,34 +190,15 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohma
|
|||
return true;
|
||||
}
|
||||
|
||||
bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
|
||||
{
|
||||
waitForReady();
|
||||
std::shared_ptr<_Network> nw;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_networks_l);
|
||||
auto nwi = _networks.find(networkId);
|
||||
if (nwi == _networks.end())
|
||||
return false;
|
||||
nw = nwi->second;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> l2(nw->lock);
|
||||
_fillSummaryInfo(nw,info);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void DB::networks(std::vector<uint64_t> &networks)
|
||||
void DB::networks(std::set<uint64_t> &networks)
|
||||
{
|
||||
waitForReady();
|
||||
std::lock_guard<std::mutex> l(_networks_l);
|
||||
networks.reserve(_networks.size() + 1);
|
||||
for(auto n=_networks.begin();n!=_networks.end();++n)
|
||||
networks.push_back(n->first);
|
||||
networks.insert(n->first);
|
||||
}
|
||||
|
||||
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push)
|
||||
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners)
|
||||
{
|
||||
uint64_t memberId = 0;
|
||||
uint64_t networkId = 0;
|
||||
|
@ -313,8 +282,12 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
|
|||
}
|
||||
}
|
||||
|
||||
if (push)
|
||||
_controller->onNetworkMemberUpdate(networkId,memberId);
|
||||
if (notifyListeners) {
|
||||
std::lock_guard<std::mutex> ll(_changeListeners_l);
|
||||
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
|
||||
(*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
|
||||
}
|
||||
}
|
||||
} else if (memberId) {
|
||||
if (nw) {
|
||||
std::lock_guard<std::mutex> l(nw->lock);
|
||||
|
@ -332,20 +305,24 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
|
|||
}
|
||||
}
|
||||
|
||||
if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)))
|
||||
_controller->onNetworkMemberDeauthorize(networkId,memberId);
|
||||
if ((notifyListeners)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
|
||||
std::lock_guard<std::mutex> ll(_changeListeners_l);
|
||||
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
|
||||
(*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push)
|
||||
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners)
|
||||
{
|
||||
if (networkConfig.is_object()) {
|
||||
const std::string ids = networkConfig["id"];
|
||||
const uint64_t id = Utils::hexStrToU64(ids.c_str());
|
||||
if (id) {
|
||||
const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
|
||||
if (networkId) {
|
||||
std::shared_ptr<_Network> nw;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_networks_l);
|
||||
std::shared_ptr<_Network> &nw2 = _networks[id];
|
||||
std::shared_ptr<_Network> &nw2 = _networks[networkId];
|
||||
if (!nw2)
|
||||
nw2.reset(new _Network);
|
||||
nw = nw2;
|
||||
|
@ -354,15 +331,19 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
|
|||
std::lock_guard<std::mutex> l2(nw->lock);
|
||||
nw->config = networkConfig;
|
||||
}
|
||||
if (push)
|
||||
_controller->onNetworkUpdate(id);
|
||||
if (notifyListeners) {
|
||||
std::lock_guard<std::mutex> ll(_changeListeners_l);
|
||||
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
|
||||
(*i)->onNetworkUpdate(this,networkId,networkConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (old.is_object()) {
|
||||
const std::string ids = old["id"];
|
||||
const uint64_t id = Utils::hexStrToU64(ids.c_str());
|
||||
if (id) {
|
||||
const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
|
||||
if (networkId) {
|
||||
std::lock_guard<std::mutex> l(_networks_l);
|
||||
_networks.erase(id);
|
||||
_networks.erase(networkId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
#ifndef ZT_CONTROLLER_DB_HPP
|
||||
#define ZT_CONTROLLER_DB_HPP
|
||||
|
||||
//#define ZT_CONTROLLER_USE_LIBPQ
|
||||
|
||||
#include "../node/Constants.hpp"
|
||||
#include "../node/Identity.hpp"
|
||||
#include "../node/InetAddress.hpp"
|
||||
|
@ -41,20 +43,29 @@
|
|||
#include <vector>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
#include "../ext/json/json.hpp"
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
|
||||
class EmbeddedNetworkController;
|
||||
|
||||
/**
|
||||
* Base class with common infrastructure for all controller DB implementations
|
||||
*/
|
||||
class DB
|
||||
{
|
||||
public:
|
||||
class ChangeListener
|
||||
{
|
||||
public:
|
||||
ChangeListener() {}
|
||||
virtual ~ChangeListener() {}
|
||||
virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) {}
|
||||
virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
|
||||
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) {}
|
||||
};
|
||||
|
||||
struct NetworkSummaryInfo
|
||||
{
|
||||
NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
|
||||
|
@ -65,27 +76,12 @@ public:
|
|||
int64_t mostRecentDeauthTime;
|
||||
};
|
||||
|
||||
/**
|
||||
* Ensure that all network fields are present
|
||||
*/
|
||||
static void initNetwork(nlohmann::json &network);
|
||||
|
||||
/**
|
||||
* Ensure that all member fields are present
|
||||
*/
|
||||
static void initMember(nlohmann::json &member);
|
||||
|
||||
/**
|
||||
* Remove old and temporary network fields
|
||||
*/
|
||||
static void cleanNetwork(nlohmann::json &network);
|
||||
|
||||
/**
|
||||
* Remove old and temporary member fields
|
||||
*/
|
||||
static void cleanMember(nlohmann::json &member);
|
||||
|
||||
DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
|
||||
DB();
|
||||
virtual ~DB();
|
||||
|
||||
virtual bool waitForReady() = 0;
|
||||
|
@ -102,18 +98,21 @@ public:
|
|||
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
|
||||
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
|
||||
|
||||
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
|
||||
void networks(std::set<uint64_t> &networks);
|
||||
|
||||
void networks(std::vector<uint64_t> &networks);
|
||||
|
||||
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
|
||||
virtual bool save(nlohmann::json &record,bool notifyListeners) = 0;
|
||||
|
||||
virtual void eraseNetwork(const uint64_t networkId) = 0;
|
||||
|
||||
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
|
||||
|
||||
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
|
||||
|
||||
inline void addListener(DB::ChangeListener *const listener)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_changeListeners_l);
|
||||
_changeListeners.push_back(listener);
|
||||
}
|
||||
|
||||
protected:
|
||||
struct _Network
|
||||
{
|
||||
|
@ -127,18 +126,14 @@ protected:
|
|||
std::mutex lock;
|
||||
};
|
||||
|
||||
void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push);
|
||||
void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push);
|
||||
void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners);
|
||||
void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners);
|
||||
void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
|
||||
|
||||
EmbeddedNetworkController *const _controller;
|
||||
const Identity _myId;
|
||||
const Address _myAddress;
|
||||
const std::string _path;
|
||||
std::string _myAddressStr;
|
||||
|
||||
std::vector<DB::ChangeListener *> _changeListeners;
|
||||
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
|
||||
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
|
||||
mutable std::mutex _changeListeners_l;
|
||||
mutable std::mutex _networks_l;
|
||||
};
|
||||
|
||||
|
|
194
controller/DBMirrorSet.cpp
Normal file
194
controller/DBMirrorSet.cpp
Normal file
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* ZeroTier One - Network Virtualization Everywhere
|
||||
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* --
|
||||
*
|
||||
* You can be released from the requirements of the license by purchasing
|
||||
* a commercial license. Buying such a license is mandatory as soon as you
|
||||
* develop commercial closed-source software that incorporates or links
|
||||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
#include "DBMirrorSet.hpp"
|
||||
|
||||
namespace ZeroTier {
|
||||
|
||||
DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
|
||||
_listener(listener)
|
||||
{
|
||||
}
|
||||
|
||||
DBMirrorSet::~DBMirrorSet()
|
||||
{
|
||||
}
|
||||
|
||||
bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if ((*d)->hasNetwork(networkId))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if ((*d)->get(networkId,network)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if ((*d)->get(networkId,network,memberId,member))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if ((*d)->get(networkId,network,memberId,member,info))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if ((*d)->get(networkId,network,members))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void DBMirrorSet::networks(std::set<uint64_t> &networks)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
(*d)->networks(networks);
|
||||
}
|
||||
}
|
||||
|
||||
bool DBMirrorSet::waitForReady()
|
||||
{
|
||||
bool r = false;
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
r |= (*d)->waitForReady();
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
bool DBMirrorSet::isReady()
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if (!(*d)->isReady())
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
|
||||
{
|
||||
std::vector< std::shared_ptr<DB> > dbs;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
dbs = _dbs;
|
||||
}
|
||||
if (notifyListeners) {
|
||||
for(auto d=dbs.begin();d!=dbs.end();++d) {
|
||||
if ((*d)->save(record,notifyListeners))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
bool modified = false;
|
||||
for(auto d=dbs.begin();d!=dbs.end();++d) {
|
||||
modified |= (*d)->save(record,notifyListeners);
|
||||
}
|
||||
return modified;
|
||||
}
|
||||
}
|
||||
|
||||
void DBMirrorSet::eraseNetwork(const uint64_t networkId)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
(*d)->eraseNetwork(networkId);
|
||||
}
|
||||
}
|
||||
|
||||
void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
(*d)->eraseMember(networkId,memberId);
|
||||
}
|
||||
}
|
||||
|
||||
void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
(*d)->nodeIsOnline(networkId,memberId,physicalAddress);
|
||||
}
|
||||
}
|
||||
|
||||
void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
|
||||
{
|
||||
nlohmann::json record(network);
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if (d->get() != db) {
|
||||
(*d)->save(record,false);
|
||||
}
|
||||
}
|
||||
_listener->onNetworkUpdate(this,networkId,network);
|
||||
}
|
||||
|
||||
void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
|
||||
{
|
||||
nlohmann::json record(member);
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
if (d->get() != db) {
|
||||
(*d)->save(record,false);
|
||||
}
|
||||
}
|
||||
_listener->onNetworkMemberUpdate(this,networkId,memberId,member);
|
||||
}
|
||||
|
||||
void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
|
||||
{
|
||||
_listener->onNetworkMemberDeauthorize(this,networkId,memberId);
|
||||
}
|
||||
|
||||
} // namespace ZeroTier
|
81
controller/DBMirrorSet.hpp
Normal file
81
controller/DBMirrorSet.hpp
Normal file
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* ZeroTier One - Network Virtualization Everywhere
|
||||
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* --
|
||||
*
|
||||
* You can be released from the requirements of the license by purchasing
|
||||
* a commercial license. Buying such a license is mandatory as soon as you
|
||||
* develop commercial closed-source software that incorporates or links
|
||||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
#ifndef ZT_DBMIRRORSET_HPP
|
||||
#define ZT_DBMIRRORSET_HPP
|
||||
|
||||
#include "DB.hpp"
|
||||
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
namespace ZeroTier {
|
||||
|
||||
class DBMirrorSet : public DB::ChangeListener
|
||||
{
|
||||
public:
|
||||
DBMirrorSet(DB::ChangeListener *listener);
|
||||
virtual ~DBMirrorSet();
|
||||
|
||||
bool hasNetwork(const uint64_t networkId) const;
|
||||
|
||||
bool get(const uint64_t networkId,nlohmann::json &network);
|
||||
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
|
||||
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info);
|
||||
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
|
||||
|
||||
void networks(std::set<uint64_t> &networks);
|
||||
|
||||
bool waitForReady();
|
||||
bool isReady();
|
||||
bool save(nlohmann::json &record,bool notifyListeners);
|
||||
void eraseNetwork(const uint64_t networkId);
|
||||
void eraseMember(const uint64_t networkId,const uint64_t memberId);
|
||||
void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
|
||||
|
||||
// These are called by various DB instances when changes occur.
|
||||
virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
|
||||
virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
|
||||
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
|
||||
|
||||
inline void addDB(const std::shared_ptr<DB> &db)
|
||||
{
|
||||
db->addListener(this);
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
_dbs.push_back(db);
|
||||
}
|
||||
|
||||
private:
|
||||
DB::ChangeListener *const _listener;
|
||||
std::vector< std::shared_ptr< DB > > _dbs;
|
||||
mutable std::mutex _dbs_l;
|
||||
};
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
||||
#endif
|
|
@ -46,6 +46,11 @@
|
|||
#include "../version.h"
|
||||
|
||||
#include "EmbeddedNetworkController.hpp"
|
||||
#include "LFDB.hpp"
|
||||
#include "FileDB.hpp"
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
#include "PostgreSQL.hpp"
|
||||
#endif
|
||||
|
||||
#include "../node/Node.hpp"
|
||||
#include "../node/CertificateOfMembership.hpp"
|
||||
|
@ -470,6 +475,7 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa
|
|||
_node(node),
|
||||
_path(dbPath),
|
||||
_sender((NetworkController::Sender *)0),
|
||||
_db(this),
|
||||
_mqc(mqc)
|
||||
{
|
||||
}
|
||||
|
@ -488,13 +494,48 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
|
|||
_signingId = signingId;
|
||||
_sender = sender;
|
||||
_signingIdAddressString = signingId.address().toString(tmp);
|
||||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:"))
|
||||
_db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
|
||||
else // else use FileDB after endif
|
||||
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) {
|
||||
_db.addDB(std::shared_ptr<DB>(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc)));
|
||||
} else {
|
||||
#endif
|
||||
_db.reset(new FileDB(this,_signingId,_path.c_str()));
|
||||
_db->waitForReady();
|
||||
_db.addDB(std::shared_ptr<DB>(new FileDB(_path.c_str())));
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
}
|
||||
#endif
|
||||
|
||||
std::string lfJSON;
|
||||
OSUtils::readFile((_path + ZT_PATH_SEPARATOR_S ".." ZT_PATH_SEPARATOR_S "local.conf").c_str(),lfJSON);
|
||||
if (lfJSON.length() > 0) {
|
||||
nlohmann::json lfConfig(OSUtils::jsonParse(lfJSON));
|
||||
nlohmann::json &settings = lfConfig["settings"];
|
||||
if (settings.is_object()) {
|
||||
nlohmann::json &controllerDb = settings["controllerDb"];
|
||||
if (controllerDb.is_object()) {
|
||||
std::string type = controllerDb["type"];
|
||||
if (type == "lf") {
|
||||
std::string lfOwner = controllerDb["owner"];
|
||||
std::string lfHost = controllerDb["host"];
|
||||
int lfPort = controllerDb["port"];
|
||||
bool storeOnlineState = controllerDb["storeOnlineState"];
|
||||
if ((lfOwner.length())&&(lfHost.length())&&(lfPort > 0)&&(lfPort < 65536)) {
|
||||
std::size_t pubHdrLoc = lfOwner.find("Public: ");
|
||||
if ((pubHdrLoc > 0)&&((pubHdrLoc + 8) < lfOwner.length())) {
|
||||
std::string lfOwnerPublic = lfOwner.substr(pubHdrLoc + 8);
|
||||
std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t ");
|
||||
if (pubHdrEnd != std::string::npos) {
|
||||
lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd);
|
||||
_db.addDB(std::shared_ptr<DB>(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_db.waitForReady();
|
||||
}
|
||||
|
||||
void EmbeddedNetworkController::request(
|
||||
|
@ -525,15 +566,12 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
|
|||
std::string &responseBody,
|
||||
std::string &responseContentType)
|
||||
{
|
||||
if (!_db)
|
||||
return 500;
|
||||
|
||||
if ((path.size() > 0)&&(path[0] == "network")) {
|
||||
|
||||
if ((path.size() >= 2)&&(path[1].length() == 16)) {
|
||||
const uint64_t nwid = Utils::hexStrToU64(path[1].c_str());
|
||||
json network;
|
||||
if (!_db->get(nwid,network))
|
||||
if (!_db.get(nwid,network))
|
||||
return 404;
|
||||
|
||||
if (path.size() >= 3) {
|
||||
|
@ -545,7 +583,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
|
|||
|
||||
const uint64_t address = Utils::hexStrToU64(path[3].c_str());
|
||||
json member;
|
||||
if (!_db->get(nwid,network,address,member))
|
||||
if (!_db.get(nwid,network,address,member))
|
||||
return 404;
|
||||
responseBody = OSUtils::jsonDump(member);
|
||||
responseContentType = "application/json";
|
||||
|
@ -555,7 +593,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
|
|||
|
||||
responseBody = "{";
|
||||
std::vector<json> members;
|
||||
if (_db->get(nwid,network,members)) {
|
||||
if (_db.get(nwid,network,members)) {
|
||||
responseBody.reserve((members.size() + 2) * 32);
|
||||
std::string mid;
|
||||
for(auto member=members.begin();member!=members.end();++member) {
|
||||
|
@ -584,12 +622,12 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
|
|||
} else if (path.size() == 1) {
|
||||
// List networks
|
||||
|
||||
std::vector<uint64_t> networkIds;
|
||||
_db->networks(networkIds);
|
||||
std::set<uint64_t> networkIds;
|
||||
_db.networks(networkIds);
|
||||
char tmp[64];
|
||||
responseBody = "[";
|
||||
responseBody.reserve((networkIds.size() + 1) * 24);
|
||||
for(std::vector<uint64_t>::const_iterator i(networkIds.begin());i!=networkIds.end();++i) {
|
||||
for(std::set<uint64_t>::const_iterator i(networkIds.begin());i!=networkIds.end();++i) {
|
||||
if (responseBody.length() > 1)
|
||||
responseBody.push_back(',');
|
||||
OSUtils::ztsnprintf(tmp,sizeof(tmp),"\"%.16llx\"",(unsigned long long)*i);
|
||||
|
@ -606,7 +644,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
|
|||
// Controller status
|
||||
|
||||
char tmp[4096];
|
||||
const bool dbOk = _db->isReady();
|
||||
const bool dbOk = _db.isReady();
|
||||
OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\n\t\"controller\": true,\n\t\"apiVersion\": %d,\n\t\"clock\": %llu,\n\t\"databaseReady\": %s\n}\n",ZT_NETCONF_CONTROLLER_API_VERSION,(unsigned long long)OSUtils::now(),dbOk ? "true" : "false");
|
||||
responseBody = tmp;
|
||||
responseContentType = "application/json";
|
||||
|
@ -625,8 +663,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
|
|||
std::string &responseBody,
|
||||
std::string &responseContentType)
|
||||
{
|
||||
if (!_db)
|
||||
return 500;
|
||||
if (path.empty())
|
||||
return 404;
|
||||
|
||||
|
@ -660,8 +696,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
|
|||
OSUtils::ztsnprintf(addrs,sizeof(addrs),"%.10llx",(unsigned long long)address);
|
||||
|
||||
json member,network;
|
||||
_db->get(nwid,network,address,member);
|
||||
json origMember(member); // for detecting changes
|
||||
_db.get(nwid,network,address,member);
|
||||
DB::initMember(member);
|
||||
|
||||
try {
|
||||
|
@ -755,7 +790,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
|
|||
member["nwid"] = nwids;
|
||||
|
||||
DB::cleanMember(member);
|
||||
_db->save(&origMember,member);
|
||||
_db.save(member,true);
|
||||
responseBody = OSUtils::jsonDump(member);
|
||||
responseContentType = "application/json";
|
||||
|
||||
|
@ -774,7 +809,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
|
|||
Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix));
|
||||
uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL);
|
||||
if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL;
|
||||
if (!_db->hasNetwork(tryNwid)) {
|
||||
if (!_db.hasNetwork(tryNwid)) {
|
||||
nwid = tryNwid;
|
||||
break;
|
||||
}
|
||||
|
@ -785,8 +820,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
|
|||
OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)nwid);
|
||||
|
||||
json network;
|
||||
_db->get(nwid,network);
|
||||
json origNetwork(network); // for detecting changes
|
||||
_db.get(nwid,network);
|
||||
DB::initNetwork(network);
|
||||
|
||||
try {
|
||||
|
@ -1017,7 +1051,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
|
|||
network["nwid"] = nwids; // legacy
|
||||
|
||||
DB::cleanNetwork(network);
|
||||
_db->save(&origNetwork,network);
|
||||
_db.save(network,true);
|
||||
|
||||
responseBody = OSUtils::jsonDump(network);
|
||||
responseContentType = "application/json";
|
||||
|
@ -1039,8 +1073,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
|
|||
std::string &responseBody,
|
||||
std::string &responseContentType)
|
||||
{
|
||||
if (!_db)
|
||||
return 500;
|
||||
if (path.empty())
|
||||
return 404;
|
||||
|
||||
|
@ -1052,8 +1084,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
|
|||
const uint64_t address = Utils::hexStrToU64(path[3].c_str());
|
||||
|
||||
json network,member;
|
||||
_db->get(nwid,network,address,member);
|
||||
_db->eraseMember(nwid, address);
|
||||
_db.get(nwid,network,address,member);
|
||||
_db.eraseMember(nwid, address);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_memberStatus_l);
|
||||
|
@ -1068,8 +1100,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
|
|||
}
|
||||
} else {
|
||||
json network;
|
||||
_db->get(nwid,network);
|
||||
_db->eraseNetwork(nwid);
|
||||
_db.get(nwid,network);
|
||||
_db.eraseNetwork(nwid);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_memberStatus_l);
|
||||
|
@ -1093,7 +1125,57 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
|
|||
return 404;
|
||||
}
|
||||
|
||||
void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
|
||||
void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
|
||||
{
|
||||
static volatile unsigned long idCounter = 0;
|
||||
char id[128],tmp[128];
|
||||
std::string k,v;
|
||||
|
||||
try {
|
||||
// Convert Dictionary into JSON object
|
||||
json d;
|
||||
char *saveptr = (char *)0;
|
||||
for(char *l=Utils::stok(rt.data,"\n",&saveptr);(l);l=Utils::stok((char *)0,"\n",&saveptr)) {
|
||||
char *eq = strchr(l,'=');
|
||||
if (eq > l) {
|
||||
k.assign(l,(unsigned long)(eq - l));
|
||||
v.clear();
|
||||
++eq;
|
||||
while (*eq) {
|
||||
if (*eq == '\\') {
|
||||
++eq;
|
||||
if (*eq) {
|
||||
switch(*eq) {
|
||||
case 'r': v.push_back('\r'); break;
|
||||
case 'n': v.push_back('\n'); break;
|
||||
case '0': v.push_back((char)0); break;
|
||||
case 'e': v.push_back('='); break;
|
||||
default: v.push_back(*eq); break;
|
||||
}
|
||||
++eq;
|
||||
}
|
||||
} else {
|
||||
v.push_back(*(eq++));
|
||||
}
|
||||
}
|
||||
if ((k.length() > 0)&&(v.length() > 0))
|
||||
d[k] = v;
|
||||
}
|
||||
}
|
||||
|
||||
const int64_t now = OSUtils::now();
|
||||
OSUtils::ztsnprintf(id,sizeof(id),"%.10llx-%.16llx-%.10llx-%.4x",_signingId.address().toInt(),now,rt.origin,(unsigned int)(idCounter++ & 0xffff));
|
||||
d["id"] = id;
|
||||
d["objtype"] = "trace";
|
||||
d["ts"] = now;
|
||||
d["nodeId"] = Utils::hex10(rt.origin,tmp);
|
||||
_db.save(d,true);
|
||||
} catch ( ... ) {
|
||||
// drop invalid trace messages if an error occurs
|
||||
}
|
||||
}
|
||||
|
||||
void EmbeddedNetworkController::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
|
||||
{
|
||||
// Send an update to all members of the network that are online
|
||||
const int64_t now = OSUtils::now();
|
||||
|
@ -1104,7 +1186,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
|
|||
}
|
||||
}
|
||||
|
||||
void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId)
|
||||
void EmbeddedNetworkController::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
|
||||
{
|
||||
// Push update to member if online
|
||||
try {
|
||||
|
@ -1115,7 +1197,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,c
|
|||
} catch ( ... ) {}
|
||||
}
|
||||
|
||||
void EmbeddedNetworkController::onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId)
|
||||
void EmbeddedNetworkController::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
|
||||
{
|
||||
const int64_t now = OSUtils::now();
|
||||
Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM);
|
||||
|
@ -1138,10 +1220,7 @@ void EmbeddedNetworkController::_request(
|
|||
{
|
||||
char nwids[24];
|
||||
DB::NetworkSummaryInfo ns;
|
||||
json network,member,origMember;
|
||||
|
||||
if (!_db)
|
||||
return;
|
||||
json network,member;
|
||||
|
||||
if (((!_signingId)||(!_signingId.hasPrivate()))||(_signingId.address().toInt() != (nwid >> 24))||(!_sender))
|
||||
return;
|
||||
|
@ -1156,15 +1235,14 @@ void EmbeddedNetworkController::_request(
|
|||
ms.lastRequestTime = now;
|
||||
}
|
||||
|
||||
_db->nodeIsOnline(nwid,identity.address().toInt(),fromAddr);
|
||||
_db.nodeIsOnline(nwid,identity.address().toInt(),fromAddr);
|
||||
|
||||
Utils::hex(nwid,nwids);
|
||||
_db->get(nwid,network,identity.address().toInt(),member,ns);
|
||||
_db.get(nwid,network,identity.address().toInt(),member,ns);
|
||||
if ((!network.is_object())||(network.size() == 0)) {
|
||||
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
origMember = member;
|
||||
const bool newMember = ((!member.is_object())||(member.size() == 0));
|
||||
DB::initMember(member);
|
||||
|
||||
|
@ -1265,7 +1343,7 @@ void EmbeddedNetworkController::_request(
|
|||
} else {
|
||||
// If they are not authorized, STOP!
|
||||
DB::cleanMember(member);
|
||||
_db->save(&origMember,member);
|
||||
_db.save(member,true);
|
||||
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
|
||||
return;
|
||||
}
|
||||
|
@ -1637,7 +1715,7 @@ void EmbeddedNetworkController::_request(
|
|||
}
|
||||
|
||||
DB::cleanMember(member);
|
||||
_db->save(&origMember,member);
|
||||
_db.save(member,true);
|
||||
_sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6);
|
||||
}
|
||||
|
||||
|
|
|
@ -51,10 +51,7 @@
|
|||
#include "../ext/json/json.hpp"
|
||||
|
||||
#include "DB.hpp"
|
||||
#include "FileDB.hpp"
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
#include "PostgreSQL.hpp"
|
||||
#endif
|
||||
#include "DBMirrorSet.hpp"
|
||||
|
||||
namespace ZeroTier {
|
||||
|
||||
|
@ -62,7 +59,7 @@ class Node;
|
|||
|
||||
struct MQConfig;
|
||||
|
||||
class EmbeddedNetworkController : public NetworkController
|
||||
class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener
|
||||
{
|
||||
public:
|
||||
/**
|
||||
|
@ -103,10 +100,11 @@ public:
|
|||
std::string &responseBody,
|
||||
std::string &responseContentType);
|
||||
|
||||
// Called on update via POST or by JSONDB on external update of network or network member records
|
||||
void onNetworkUpdate(const uint64_t networkId);
|
||||
void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId);
|
||||
void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
|
||||
void handleRemoteTrace(const ZT_RemoteTrace &rt);
|
||||
|
||||
virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
|
||||
virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
|
||||
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
|
||||
|
||||
private:
|
||||
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
|
||||
|
@ -156,7 +154,7 @@ private:
|
|||
std::string _signingIdAddressString;
|
||||
NetworkController::Sender *_sender;
|
||||
|
||||
std::unique_ptr<DB> _db;
|
||||
DBMirrorSet _db;
|
||||
BlockingQueue< _RQEntry * > _queue;
|
||||
|
||||
std::vector<std::thread> _threads;
|
||||
|
|
|
@ -29,10 +29,10 @@
|
|||
namespace ZeroTier
|
||||
{
|
||||
|
||||
FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
|
||||
DB(nc,myId,path),
|
||||
FileDB::FileDB(const char *path) :
|
||||
DB(),
|
||||
_path(path),
|
||||
_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
|
||||
_onlineChanged(false),
|
||||
_running(true)
|
||||
{
|
||||
OSUtils::mkdir(_path.c_str());
|
||||
|
@ -69,54 +69,6 @@ FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const ch
|
|||
} catch ( ... ) {}
|
||||
}
|
||||
}
|
||||
|
||||
_onlineUpdateThread = std::thread([this]() {
|
||||
unsigned int cnt = 0;
|
||||
while (this->_running) {
|
||||
usleep(250);
|
||||
if ((++cnt % 20) == 0) { // 5 seconds
|
||||
std::lock_guard<std::mutex> l(this->_online_l);
|
||||
if (!this->_running) return;
|
||||
if (this->_onlineChanged) {
|
||||
char p[4096],atmp[64];
|
||||
for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) {
|
||||
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first);
|
||||
FILE *f = fopen(p,"wb");
|
||||
if (f) {
|
||||
fprintf(f,"{");
|
||||
const char *memberPrefix = "";
|
||||
for(auto m=nw->second.begin();m!=nw->second.end();++m) {
|
||||
fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first);
|
||||
memberPrefix = ",";
|
||||
InetAddress lastAddr;
|
||||
const char *timestampPrefix = " ";
|
||||
int cnt = 0;
|
||||
for(auto ts=m->second.rbegin();ts!=m->second.rend();) {
|
||||
if (cnt < 25) {
|
||||
if (lastAddr != ts->second) {
|
||||
lastAddr = ts->second;
|
||||
fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp));
|
||||
timestampPrefix = ",";
|
||||
++cnt;
|
||||
++ts;
|
||||
} else {
|
||||
ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
|
||||
}
|
||||
} else {
|
||||
ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
|
||||
}
|
||||
}
|
||||
fprintf(f,"}");
|
||||
}
|
||||
fprintf(f,"}" ZT_EOL_S);
|
||||
fclose(f);
|
||||
}
|
||||
}
|
||||
this->_onlineChanged = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
FileDB::~FileDB()
|
||||
|
@ -132,38 +84,37 @@ FileDB::~FileDB()
|
|||
bool FileDB::waitForReady() { return true; }
|
||||
bool FileDB::isReady() { return true; }
|
||||
|
||||
void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
|
||||
bool FileDB::save(nlohmann::json &record,bool notifyListeners)
|
||||
{
|
||||
char p1[4096],p2[4096],pb[4096];
|
||||
bool modified = false;
|
||||
try {
|
||||
if (orig) {
|
||||
if (*orig != record) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
|
||||
}
|
||||
} else {
|
||||
record["revision"] = 1;
|
||||
}
|
||||
|
||||
const std::string objtype = record["objtype"];
|
||||
if (objtype == "network") {
|
||||
|
||||
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
|
||||
if (nwid) {
|
||||
nlohmann::json old;
|
||||
get(nwid,old);
|
||||
if ((!old.is_object())||(old != record)) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
|
||||
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
|
||||
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
|
||||
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
|
||||
_networkChanged(old,record,true);
|
||||
_networkChanged(old,record,notifyListeners);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
|
||||
} else if (objtype == "member") {
|
||||
|
||||
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
|
||||
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
|
||||
if ((id)&&(nwid)) {
|
||||
nlohmann::json network,old;
|
||||
get(nwid,network,id,old);
|
||||
if ((!old.is_object())||(old != record)) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
|
||||
OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid);
|
||||
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
|
||||
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) {
|
||||
|
@ -173,11 +124,14 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
|
|||
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
|
||||
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
|
||||
}
|
||||
_memberChanged(old,record,true);
|
||||
_memberChanged(old,record,notifyListeners);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} catch ( ... ) {} // drop invalid records missing fields
|
||||
return modified;
|
||||
}
|
||||
|
||||
void FileDB::eraseNetwork(const uint64_t networkId)
|
||||
|
@ -187,14 +141,11 @@ void FileDB::eraseNetwork(const uint64_t networkId)
|
|||
char p[16384];
|
||||
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId);
|
||||
OSUtils::rm(p);
|
||||
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId);
|
||||
OSUtils::rm(p);
|
||||
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId);
|
||||
OSUtils::rmDashRf(p);
|
||||
_networkChanged(network,nullJson,true);
|
||||
std::lock_guard<std::mutex> l(this->_online_l);
|
||||
this->_online.erase(networkId);
|
||||
this->_onlineChanged = true;
|
||||
}
|
||||
|
||||
void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
|
||||
|
@ -208,7 +159,6 @@ void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
|
|||
_memberChanged(member,nullJson,true);
|
||||
std::lock_guard<std::mutex> l(this->_online_l);
|
||||
this->_online[networkId].erase(memberId);
|
||||
this->_onlineChanged = true;
|
||||
}
|
||||
|
||||
void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
|
||||
|
@ -218,7 +168,6 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const
|
|||
physicalAddress.toString(atmp);
|
||||
std::lock_guard<std::mutex> l(this->_online_l);
|
||||
this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
|
||||
this->_onlineChanged = true;
|
||||
}
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
|
|
@ -35,22 +35,22 @@ namespace ZeroTier
|
|||
class FileDB : public DB
|
||||
{
|
||||
public:
|
||||
FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
|
||||
FileDB(const char *path);
|
||||
virtual ~FileDB();
|
||||
|
||||
virtual bool waitForReady();
|
||||
virtual bool isReady();
|
||||
virtual void save(nlohmann::json *orig,nlohmann::json &record);
|
||||
virtual bool save(nlohmann::json &record,bool notifyListeners);
|
||||
virtual void eraseNetwork(const uint64_t networkId);
|
||||
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
|
||||
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
|
||||
|
||||
protected:
|
||||
std::string _path;
|
||||
std::string _networksPath;
|
||||
std::thread _onlineUpdateThread;
|
||||
std::map< uint64_t,std::map<uint64_t,std::map<int64_t,InetAddress> > > _online;
|
||||
std::mutex _online_l;
|
||||
bool _onlineChanged;
|
||||
bool _running;
|
||||
};
|
||||
|
||||
|
|
402
controller/LFDB.cpp
Normal file
402
controller/LFDB.cpp
Normal file
|
@ -0,0 +1,402 @@
|
|||
/*
|
||||
* ZeroTier One - Network Virtualization Everywhere
|
||||
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* --
|
||||
*
|
||||
* You can be released from the requirements of the license by purchasing
|
||||
* a commercial license. Buying such a license is mandatory as soon as you
|
||||
* develop commercial closed-source software that incorporates or links
|
||||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
#include "LFDB.hpp"
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
#include "../osdep/OSUtils.hpp"
|
||||
#include "../ext/cpp-httplib/httplib.h"
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
|
||||
LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
|
||||
DB(),
|
||||
_myId(myId),
|
||||
_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
|
||||
_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
|
||||
_lfNodeHost((lfNodeHost) ? lfNodeHost : "127.0.0.1"),
|
||||
_lfNodePort(((lfNodePort > 0)&&(lfNodePort < 65536)) ? lfNodePort : 9980),
|
||||
_running(true),
|
||||
_ready(false),
|
||||
_storeOnlineState(storeOnlineState)
|
||||
{
|
||||
_syncThread = std::thread([this]() {
|
||||
char controllerAddress[24];
|
||||
const uint64_t controllerAddressInt = _myId.address().toInt();
|
||||
_myId.address().toString(controllerAddress);
|
||||
std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
|
||||
std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member");
|
||||
|
||||
// LF record masking key is the first 32 bytes of SHA512(controller private key) in hex,
|
||||
// hiding record values from anything but the controller or someone who has its key.
|
||||
uint8_t sha512pk[64];
|
||||
_myId.sha512PrivateKey(sha512pk);
|
||||
char maskingKey [128];
|
||||
Utils::hex(sha512pk,32,maskingKey);
|
||||
|
||||
httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
|
||||
int64_t timeRangeStart = 0;
|
||||
while (_running.load()) {
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(_state_l);
|
||||
for(auto ns=_state.begin();ns!=_state.end();++ns) {
|
||||
if (ns->second.dirty) {
|
||||
nlohmann::json network;
|
||||
if (get(ns->first,network)) {
|
||||
nlohmann::json newrec,selector0;
|
||||
selector0["Name"] = networksSelectorName;
|
||||
selector0["Ordinal"] = ns->first;
|
||||
newrec["Selectors"].push_back(selector0);
|
||||
newrec["Value"] = network.dump();
|
||||
newrec["OwnerPrivate"] = _lfOwnerPrivate;
|
||||
newrec["MaskingKey"] = maskingKey;
|
||||
newrec["PulseIfUnchanged"] = true;
|
||||
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
|
||||
if (resp) {
|
||||
if (resp->status == 200) {
|
||||
ns->second.dirty = false;
|
||||
//printf("SET network %.16llx %s\n",ns->first,resp->body.c_str());
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str());
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) {
|
||||
if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)&&(ms->second.lastOnlineAddress)) {
|
||||
nlohmann::json newrec,selector0,selector1,selectors,ip;
|
||||
char tmp[1024],tmp2[128];
|
||||
OSUtils::ztsnprintf(tmp,sizeof(tmp),"com.zerotier.controller.lfdb:%s/network/%.16llx/online",controllerAddress,(unsigned long long)ns->first);
|
||||
ms->second.lastOnlineAddress.toIpString(tmp2);
|
||||
selector0["Name"] = tmp;
|
||||
selector0["Ordinal"] = ms->first;
|
||||
selector1["Name"] = tmp2;
|
||||
selector1["Ordinal"] = 0;
|
||||
selectors.push_back(selector0);
|
||||
selectors.push_back(selector1);
|
||||
newrec["Selectors"] = selectors;
|
||||
const uint8_t *const rawip = (const uint8_t *)ms->second.lastOnlineAddress.rawIpData();
|
||||
switch(ms->second.lastOnlineAddress.ss_family) {
|
||||
case AF_INET:
|
||||
for(int j=0;j<4;++j)
|
||||
ip.push_back((unsigned int)rawip[j]);
|
||||
break;
|
||||
case AF_INET6:
|
||||
for(int j=0;j<16;++j)
|
||||
ip.push_back((unsigned int)rawip[j]);
|
||||
break;
|
||||
default:
|
||||
ip = tmp2; // should never happen since only IP transport is currently supported
|
||||
break;
|
||||
}
|
||||
newrec["Value"] = ip;
|
||||
newrec["OwnerPrivate"] = _lfOwnerPrivate;
|
||||
newrec["MaskingKey"] = maskingKey;
|
||||
newrec["Timestamp"] = ms->second.lastOnlineTime;
|
||||
newrec["PulseIfUnchanged"] = true;
|
||||
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
|
||||
if (resp) {
|
||||
if (resp->status == 200) {
|
||||
ms->second.lastOnlineDirty = false;
|
||||
//printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str());
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
|
||||
}
|
||||
}
|
||||
|
||||
if (ms->second.dirty) {
|
||||
nlohmann::json network,member;
|
||||
if (get(ns->first,network,ms->first,member)) {
|
||||
nlohmann::json newrec,selector0,selector1,selectors;
|
||||
selector0["Name"] = networksSelectorName;
|
||||
selector0["Ordinal"] = ns->first;
|
||||
selector1["Name"] = membersSelectorName;
|
||||
selector1["Ordinal"] = ms->first;
|
||||
selectors.push_back(selector0);
|
||||
selectors.push_back(selector1);
|
||||
newrec["Selectors"] = selectors;
|
||||
newrec["Value"] = member.dump();
|
||||
newrec["OwnerPrivate"] = _lfOwnerPrivate;
|
||||
newrec["MaskingKey"] = maskingKey;
|
||||
newrec["PulseIfUnchanged"] = true;
|
||||
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
|
||||
if (resp) {
|
||||
if (resp->status == 200) {
|
||||
ms->second.dirty = false;
|
||||
//printf("SET member %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::ostringstream query;
|
||||
query
|
||||
<< '{'
|
||||
<< "\"Ranges\":[{"
|
||||
<< "\"Name\":\"" << networksSelectorName << "\","
|
||||
<< "\"Range\":[0,18446744073709551615]"
|
||||
<< "}],"
|
||||
<< "\"TimeRange\":[" << timeRangeStart << ",9223372036854775807],"
|
||||
<< "\"MaskingKey\":\"" << maskingKey << "\","
|
||||
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
|
||||
<< '}';
|
||||
auto resp = htcli.Post("/query",query.str(),"application/json");
|
||||
if (resp) {
|
||||
if (resp->status == 200) {
|
||||
nlohmann::json results(OSUtils::jsonParse(resp->body));
|
||||
if ((results.is_array())&&(results.size() > 0)) {
|
||||
for(std::size_t ri=0;ri<results.size();++ri) {
|
||||
nlohmann::json &rset = results[ri];
|
||||
if ((rset.is_array())&&(rset.size() > 0)) {
|
||||
|
||||
nlohmann::json &result = rset[0];
|
||||
if (result.is_object()) {
|
||||
nlohmann::json &record = result["Record"];
|
||||
if (record.is_object()) {
|
||||
const std::string recordValue = result["Value"];
|
||||
//printf("GET network %s\n",recordValue.c_str());
|
||||
nlohmann::json network(OSUtils::jsonParse(recordValue));
|
||||
if (network.is_object()) {
|
||||
const std::string idstr = network["id"];
|
||||
const uint64_t id = Utils::hexStrToU64(idstr.c_str());
|
||||
if ((id >> 24) == controllerAddressInt) { // sanity check
|
||||
|
||||
std::lock_guard<std::mutex> sl(_state_l);
|
||||
_NetworkState &ns = _state[id];
|
||||
if (!ns.dirty) {
|
||||
nlohmann::json oldNetwork;
|
||||
if ((timeRangeStart > 0)&&(get(id,oldNetwork))) {
|
||||
const uint64_t revision = network["revision"];
|
||||
const uint64_t prevRevision = oldNetwork["revision"];
|
||||
if (prevRevision < revision) {
|
||||
_networkChanged(oldNetwork,network,timeRangeStart > 0);
|
||||
}
|
||||
} else {
|
||||
nlohmann::json nullJson;
|
||||
_networkChanged(nullJson,network,timeRangeStart > 0);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str());
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::ostringstream query;
|
||||
query
|
||||
<< '{'
|
||||
<< "\"Ranges\":[{"
|
||||
<< "\"Name\":\"" << networksSelectorName << "\","
|
||||
<< "\"Range\":[0,18446744073709551615]"
|
||||
<< "},{"
|
||||
<< "\"Name\":\"" << membersSelectorName << "\","
|
||||
<< "\"Range\":[0,18446744073709551615]"
|
||||
<< "}],"
|
||||
<< "\"TimeRange\":[" << timeRangeStart << ",9223372036854775807],"
|
||||
<< "\"MaskingKey\":\"" << maskingKey << "\","
|
||||
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
|
||||
<< '}';
|
||||
auto resp = htcli.Post("/query",query.str(),"application/json");
|
||||
if (resp) {
|
||||
if (resp->status == 200) {
|
||||
nlohmann::json results(OSUtils::jsonParse(resp->body));
|
||||
if ((results.is_array())&&(results.size() > 0)) {
|
||||
for(std::size_t ri=0;ri<results.size();++ri) {
|
||||
nlohmann::json &rset = results[ri];
|
||||
if ((rset.is_array())&&(rset.size() > 0)) {
|
||||
|
||||
nlohmann::json &result = rset[0];
|
||||
if (result.is_object()) {
|
||||
nlohmann::json &record = result["Record"];
|
||||
if (record.is_object()) {
|
||||
const std::string recordValue = result["Value"];
|
||||
//printf("GET member %s\n",recordValue.c_str());
|
||||
nlohmann::json member(OSUtils::jsonParse(recordValue));
|
||||
if (member.is_object()) {
|
||||
const std::string nwidstr = member["nwid"];
|
||||
const std::string idstr = member["id"];
|
||||
const uint64_t nwid = Utils::hexStrToU64(nwidstr.c_str());
|
||||
const uint64_t id = Utils::hexStrToU64(idstr.c_str());
|
||||
if ((id)&&((nwid >> 24) == controllerAddressInt)) { // sanity check
|
||||
|
||||
std::lock_guard<std::mutex> sl(_state_l);
|
||||
auto ns = _state.find(nwid);
|
||||
if ((ns == _state.end())||(!ns->second.members[id].dirty)) {
|
||||
nlohmann::json network,oldMember;
|
||||
if ((timeRangeStart > 0)&&(get(nwid,network,id,oldMember))) {
|
||||
const uint64_t revision = member["revision"];
|
||||
const uint64_t prevRevision = oldMember["revision"];
|
||||
if (prevRevision < revision)
|
||||
_memberChanged(oldMember,member,timeRangeStart > 0);
|
||||
}
|
||||
} else {
|
||||
nlohmann::json nullJson;
|
||||
_memberChanged(nullJson,member,timeRangeStart > 0);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str());
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
|
||||
}
|
||||
}
|
||||
|
||||
timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates
|
||||
_ready.store(true);
|
||||
|
||||
for(int k=0;k<4;++k) { // 2s delay between queries for remotely modified networks or members
|
||||
if (!_running.load())
|
||||
return;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
LFDB::~LFDB()
|
||||
{
|
||||
_running.store(false);
|
||||
_syncThread.join();
|
||||
}
|
||||
|
||||
bool LFDB::waitForReady()
|
||||
{
|
||||
while (!_ready.load()) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool LFDB::isReady()
|
||||
{
|
||||
return (_ready.load());
|
||||
}
|
||||
|
||||
bool LFDB::save(nlohmann::json &record,bool notifyListeners)
|
||||
{
|
||||
bool modified = false;
|
||||
const std::string objtype = record["objtype"];
|
||||
if (objtype == "network") {
|
||||
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
|
||||
if (nwid) {
|
||||
nlohmann::json old;
|
||||
get(nwid,old);
|
||||
if ((!old.is_object())||(old != record)) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
|
||||
_networkChanged(old,record,notifyListeners);
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_state_l);
|
||||
_state[nwid].dirty = true;
|
||||
}
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
} else if (objtype == "member") {
|
||||
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
|
||||
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
|
||||
if ((id)&&(nwid)) {
|
||||
nlohmann::json network,old;
|
||||
get(nwid,network,id,old);
|
||||
if ((!old.is_object())||(old != record)) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
|
||||
_memberChanged(old,record,notifyListeners);
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_state_l);
|
||||
_state[nwid].members[id].dirty = true;
|
||||
}
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return modified;
|
||||
}
|
||||
|
||||
void LFDB::eraseNetwork(const uint64_t networkId)
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
|
||||
void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
|
||||
void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_state_l);
|
||||
auto nw = _state.find(networkId);
|
||||
if (nw != _state.end()) {
|
||||
auto m = nw->second.members.find(memberId);
|
||||
if (m != nw->second.members.end()) {
|
||||
m->second.lastOnlineTime = OSUtils::now();
|
||||
if (physicalAddress)
|
||||
m->second.lastOnlineAddress = physicalAddress;
|
||||
m->second.lastOnlineDirty = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ZeroTier
|
102
controller/LFDB.hpp
Normal file
102
controller/LFDB.hpp
Normal file
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* ZeroTier One - Network Virtualization Everywhere
|
||||
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* --
|
||||
*
|
||||
* You can be released from the requirements of the license by purchasing
|
||||
* a commercial license. Buying such a license is mandatory as soon as you
|
||||
* develop commercial closed-source software that incorporates or links
|
||||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
#ifndef ZT_CONTROLLER_LFDB_HPP
|
||||
#define ZT_CONTROLLER_LFDB_HPP
|
||||
|
||||
#include "DB.hpp"
|
||||
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <atomic>
|
||||
|
||||
namespace ZeroTier {
|
||||
|
||||
/**
|
||||
* DB implementation for controller that stores data in LF
|
||||
*/
|
||||
class LFDB : public DB
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* @param myId This controller's identity
|
||||
* @param path Base path for ZeroTier node itself
|
||||
* @param lfOwnerPrivate LF owner private in PEM format
|
||||
* @param lfOwnerPublic LF owner public in @base62 format
|
||||
* @param lfNodeHost LF node host
|
||||
* @param lfNodePort LF node http (not https) port
|
||||
* @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!)
|
||||
*/
|
||||
LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState);
|
||||
virtual ~LFDB();
|
||||
|
||||
virtual bool waitForReady();
|
||||
virtual bool isReady();
|
||||
virtual bool save(nlohmann::json &record,bool notifyListeners);
|
||||
virtual void eraseNetwork(const uint64_t networkId);
|
||||
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
|
||||
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
|
||||
|
||||
protected:
|
||||
const Identity _myId;
|
||||
|
||||
std::string _lfOwnerPrivate,_lfOwnerPublic;
|
||||
std::string _lfNodeHost;
|
||||
int _lfNodePort;
|
||||
|
||||
struct _MemberState
|
||||
{
|
||||
_MemberState() :
|
||||
lastOnlineAddress(),
|
||||
lastOnlineTime(0),
|
||||
dirty(false),
|
||||
lastOnlineDirty(false) {}
|
||||
InetAddress lastOnlineAddress;
|
||||
int64_t lastOnlineTime;
|
||||
bool dirty;
|
||||
bool lastOnlineDirty;
|
||||
};
|
||||
struct _NetworkState
|
||||
{
|
||||
_NetworkState() :
|
||||
members(),
|
||||
dirty(false) {}
|
||||
std::unordered_map<uint64_t,_MemberState> members;
|
||||
bool dirty;
|
||||
};
|
||||
std::unordered_map<uint64_t,_NetworkState> _state;
|
||||
std::mutex _state_l;
|
||||
|
||||
std::atomic_bool _running;
|
||||
std::atomic_bool _ready;
|
||||
std::thread _syncThread;
|
||||
bool _storeOnlineState;
|
||||
};
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
||||
#endif
|
|
@ -24,9 +24,11 @@
|
|||
* of your own application.
|
||||
*/
|
||||
|
||||
#include "PostgreSQL.hpp"
|
||||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
|
||||
#include "PostgreSQL.hpp"
|
||||
#include "../node/Constants.hpp"
|
||||
#include "EmbeddedNetworkController.hpp"
|
||||
#include "RabbitMQ.hpp"
|
||||
#include "../version.h"
|
||||
|
@ -37,8 +39,11 @@
|
|||
#include <amqp_tcp_socket.h>
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
namespace {
|
||||
|
||||
static const int DB_MINIMUM_VERSION = 5;
|
||||
|
||||
static const char *_timestr()
|
||||
{
|
||||
time_t t = time(0);
|
||||
|
@ -56,6 +61,7 @@ static const char *_timestr()
|
|||
return ts;
|
||||
}
|
||||
|
||||
/*
|
||||
std::string join(const std::vector<std::string> &elements, const char * const separator)
|
||||
{
|
||||
switch(elements.size()) {
|
||||
|
@ -70,21 +76,56 @@ std::string join(const std::vector<std::string> &elements, const char * const se
|
|||
return os.str();
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
using namespace ZeroTier;
|
||||
|
||||
PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
|
||||
: DB(nc, myId, path)
|
||||
, _ready(0)
|
||||
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
|
||||
: DB()
|
||||
, _myId(myId)
|
||||
, _myAddress(myId.address())
|
||||
, _ready(0)
|
||||
, _connected(1)
|
||||
, _run(1)
|
||||
, _waitNoticePrinted(false)
|
||||
, _run(1)
|
||||
, _waitNoticePrinted(false)
|
||||
, _listenPort(listenPort)
|
||||
, _mqc(mqc)
|
||||
{
|
||||
_connString = std::string(path) + " application_name=controller_" +_myAddressStr;
|
||||
char myAddress[64];
|
||||
_myAddressStr = myId.address().toString(myAddress);
|
||||
_connString = std::string(path) + " application_name=controller_" + _myAddressStr;
|
||||
|
||||
// Database Schema Version Check
|
||||
PGconn *conn = getPgConn();
|
||||
if (PQstatus(conn) != CONNECTION_OK) {
|
||||
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
PGresult *res = PQexec(conn, "SELECT version FROM ztc_database");
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
fprintf(stderr, "Error determining database version");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (PQntuples(res) != 1) {
|
||||
fprintf(stderr, "Invalid number of db version tuples returned.");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int dbVersion = std::stoi(PQgetvalue(res, 0, 0));
|
||||
|
||||
if (dbVersion < DB_MINIMUM_VERSION) {
|
||||
fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
res = NULL;
|
||||
PQfinish(conn);
|
||||
conn = NULL;
|
||||
|
||||
_readyLock.lock();
|
||||
_heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
|
||||
|
@ -130,12 +171,38 @@ bool PostgreSQL::isReady()
|
|||
return ((_ready == 2)&&(_connected));
|
||||
}
|
||||
|
||||
void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
|
||||
bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
|
||||
{
|
||||
bool modified = false;
|
||||
try {
|
||||
if (!record.is_object()) {
|
||||
return;
|
||||
if (!record.is_object())
|
||||
return false;
|
||||
const std::string objtype = record["objtype"];
|
||||
if (objtype == "network") {
|
||||
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
|
||||
if (nwid) {
|
||||
nlohmann::json old;
|
||||
get(nwid,old);
|
||||
if ((!old.is_object())||(old != record)) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
|
||||
_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
} else if (objtype == "member") {
|
||||
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
|
||||
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
|
||||
if ((id)&&(nwid)) {
|
||||
nlohmann::json network,old;
|
||||
get(nwid,network,id,old);
|
||||
if ((!old.is_object())||(old != record)) {
|
||||
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
|
||||
_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
waitForReady();
|
||||
if (orig) {
|
||||
if (*orig != record) {
|
||||
|
@ -146,11 +213,13 @@ void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
|
|||
record["revision"] = 1;
|
||||
_commitQueue.post(new nlohmann::json(record));
|
||||
}
|
||||
*/
|
||||
} catch (std::exception &e) {
|
||||
fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
|
||||
} catch (...) {
|
||||
fprintf(stderr, "Unknown error on PostgreSQL::save\n");
|
||||
}
|
||||
return modified;
|
||||
}
|
||||
|
||||
void PostgreSQL::eraseNetwork(const uint64_t networkId)
|
||||
|
@ -158,21 +227,23 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
|
|||
char tmp2[24];
|
||||
waitForReady();
|
||||
Utils::hex(networkId, tmp2);
|
||||
json *tmp = new json();
|
||||
(*tmp)["id"] = tmp2;
|
||||
(*tmp)["objtype"] = "_delete_network";
|
||||
std::pair<nlohmann::json,bool> tmp;
|
||||
tmp.first["id"] = tmp2;
|
||||
tmp.first["objtype"] = "_delete_network";
|
||||
tmp.second = true;
|
||||
_commitQueue.post(tmp);
|
||||
}
|
||||
|
||||
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
||||
{
|
||||
char tmp2[24];
|
||||
json *tmp = new json();
|
||||
std::pair<nlohmann::json,bool> tmp;
|
||||
Utils::hex(networkId, tmp2);
|
||||
(*tmp)["nwid"] = tmp2;
|
||||
tmp.first["nwid"] = tmp2;
|
||||
Utils::hex(memberId, tmp2);
|
||||
(*tmp)["id"] = tmp2;
|
||||
(*tmp)["objtype"] = "_delete_member";
|
||||
tmp.first["id"] = tmp2;
|
||||
tmp.first["objtype"] = "_delete_member";
|
||||
tmp.second = true;
|
||||
_commitQueue.post(tmp);
|
||||
}
|
||||
|
||||
|
@ -563,8 +634,8 @@ void PostgreSQL::heartbeat()
|
|||
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
|
||||
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
|
||||
"use_rabbitmq = EXCLUDED.use_rabbitmq",
|
||||
10, // number of parameters
|
||||
NULL, // oid field. ignore
|
||||
10, // number of parameters
|
||||
NULL, // oid field. ignore
|
||||
values, // values for substitution
|
||||
NULL, // lengths in bytes of each value
|
||||
NULL, // binary?
|
||||
|
@ -685,7 +756,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
|
|||
fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
|
||||
} catch(...) {
|
||||
fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -802,18 +873,18 @@ void PostgreSQL::commitThread()
|
|||
exit(1);
|
||||
}
|
||||
|
||||
json *config = nullptr;
|
||||
while(_commitQueue.get(config)&(_run == 1)) {
|
||||
if (!config) {
|
||||
std::pair<nlohmann::json,bool> qitem;
|
||||
while(_commitQueue.get(qitem)&(_run == 1)) {
|
||||
if (!qitem.first.is_object()) {
|
||||
continue;
|
||||
}
|
||||
if (PQstatus(conn) == CONNECTION_BAD) {
|
||||
fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
|
||||
PQfinish(conn);
|
||||
delete config;
|
||||
exit(1);
|
||||
}
|
||||
try {
|
||||
try {
|
||||
nlohmann::json *config = &(qitem.first);
|
||||
const std::string objtype = (*config)["objtype"];
|
||||
if (objtype == "member") {
|
||||
try {
|
||||
|
@ -968,12 +1039,12 @@ void PostgreSQL::commitThread()
|
|||
nlohmann::json memOrig;
|
||||
|
||||
nlohmann::json memNew(*config);
|
||||
|
||||
|
||||
get(nwidInt, nwOrig, memberidInt, memOrig);
|
||||
|
||||
_memberChanged(memOrig, memNew, (this->_ready>=2));
|
||||
_memberChanged(memOrig, memNew, qitem.second);
|
||||
} else {
|
||||
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
|
||||
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", (unsigned long long)nwidInt, (unsigned long long)memberidInt);
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
|
@ -1194,16 +1265,14 @@ void PostgreSQL::commitThread()
|
|||
|
||||
get(nwidInt, nwOrig);
|
||||
|
||||
_networkChanged(nwOrig, nwNew, true);
|
||||
_networkChanged(nwOrig, nwNew, qitem.second);
|
||||
} else {
|
||||
fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
|
||||
fprintf(stderr, "Can't notify network changed: %llu\n", (unsigned long long)nwidInt);
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
|
||||
}
|
||||
} else if (objtype == "trace") {
|
||||
fprintf(stderr, "ERROR: Trace not yet implemented");
|
||||
} else if (objtype == "_delete_network") {
|
||||
try {
|
||||
std::string networkId = (*config)["nwid"];
|
||||
|
@ -1260,8 +1329,6 @@ void PostgreSQL::commitThread()
|
|||
} catch (std::exception &e) {
|
||||
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
|
||||
}
|
||||
delete config;
|
||||
config = nullptr;
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
|
@ -1283,9 +1350,9 @@ void PostgreSQL::onlineNotificationThread()
|
|||
}
|
||||
_connected = 1;
|
||||
|
||||
int64_t lastUpdatedNetworkStatus = 0;
|
||||
//int64_t lastUpdatedNetworkStatus = 0;
|
||||
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
|
||||
|
||||
|
||||
while (_run == 1) {
|
||||
if (PQstatus(conn) != CONNECTION_OK) {
|
||||
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
|
||||
|
@ -1389,129 +1456,6 @@ void PostgreSQL::onlineNotificationThread()
|
|||
PQclear(res);
|
||||
}
|
||||
|
||||
const int64_t now = OSUtils::now();
|
||||
if ((now - lastUpdatedNetworkStatus) > 10000) {
|
||||
lastUpdatedNetworkStatus = now;
|
||||
|
||||
std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_networks_l);
|
||||
for (auto i = _networks.begin(); i != _networks.end(); ++i) {
|
||||
networks.push_back(*i);
|
||||
}
|
||||
}
|
||||
|
||||
std::stringstream networkUpdate;
|
||||
networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES ";
|
||||
bool nwFirstRun = true;
|
||||
bool networkAdded = false;
|
||||
for (auto i = networks.begin(); i != networks.end(); ++i) {
|
||||
char tmp[64];
|
||||
Utils::hex(i->first, tmp);
|
||||
|
||||
std::string networkId(tmp);
|
||||
|
||||
std::vector<std::string> &_notUsed = updateMap[networkId];
|
||||
(void)_notUsed;
|
||||
|
||||
uint64_t authMemberCount = 0;
|
||||
uint64_t totalMemberCount = 0;
|
||||
uint64_t onlineMemberCount = 0;
|
||||
uint64_t bridgeCount = 0;
|
||||
uint64_t ts = now;
|
||||
{
|
||||
std::lock_guard<std::mutex> l2(i->second->lock);
|
||||
authMemberCount = i->second->authorizedMembers.size();
|
||||
totalMemberCount = i->second->members.size();
|
||||
bridgeCount = i->second->activeBridgeMembers.size();
|
||||
for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) {
|
||||
auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first));
|
||||
if (lo != lastOnlineCumulative.end()) {
|
||||
if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) {
|
||||
++onlineMemberCount;
|
||||
} else {
|
||||
lastOnlineCumulative.erase(lo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const char *nvals[1] = {
|
||||
networkId.c_str()
|
||||
};
|
||||
|
||||
res = PQexecParams(conn,
|
||||
"SELECT id FROM ztc_network WHERE id = $1",
|
||||
1,
|
||||
NULL,
|
||||
nvals,
|
||||
NULL,
|
||||
NULL,
|
||||
0);
|
||||
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn));
|
||||
PQclear(res);
|
||||
continue;
|
||||
}
|
||||
|
||||
int nrows = PQntuples(res);
|
||||
PQclear(res);
|
||||
|
||||
if (nrows == 1) {
|
||||
std::string bc = std::to_string(bridgeCount);
|
||||
std::string amc = std::to_string(authMemberCount);
|
||||
std::string omc = std::to_string(onlineMemberCount);
|
||||
std::string tmc = std::to_string(totalMemberCount);
|
||||
std::string timestamp = std::to_string(ts);
|
||||
|
||||
if (nwFirstRun) {
|
||||
nwFirstRun = false;
|
||||
} else {
|
||||
networkUpdate << ", ";
|
||||
}
|
||||
|
||||
networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", "
|
||||
<< "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
|
||||
|
||||
networkAdded = true;
|
||||
|
||||
} else if (nrows > 1) {
|
||||
fprintf(stderr, "Number of networks > 1?!?!?");
|
||||
continue;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
|
||||
<< "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
|
||||
<< "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified";
|
||||
if (networkAdded) {
|
||||
res = PQexec(conn, networkUpdate.str().c_str());
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
||||
fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res));
|
||||
}
|
||||
PQclear(res);
|
||||
}
|
||||
}
|
||||
|
||||
// for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
|
||||
// std::string networkId = it->first;
|
||||
// std::vector<std::string> members = it->second;
|
||||
// std::stringstream queryBuilder;
|
||||
|
||||
// std::string membersStr = ::join(members, ",");
|
||||
|
||||
// queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
|
||||
// std::string query = queryBuilder.str();
|
||||
|
||||
// PGresult *res = PQexec(conn,query.c_str());
|
||||
// if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
||||
// fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
|
||||
// }
|
||||
// PQclear(res);
|
||||
// }
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
|
||||
|
@ -1522,7 +1466,8 @@ void PostgreSQL::onlineNotificationThread()
|
|||
}
|
||||
}
|
||||
|
||||
PGconn *PostgreSQL::getPgConn(OverrideMode m) {
|
||||
PGconn *PostgreSQL::getPgConn(OverrideMode m)
|
||||
{
|
||||
if (m == ALLOW_PGBOUNCER_OVERRIDE) {
|
||||
char *connStr = getenv("PGBOUNCER_CONNSTR");
|
||||
if (connStr != NULL) {
|
||||
|
@ -1536,4 +1481,5 @@ PGconn *PostgreSQL::getPgConn(OverrideMode m) {
|
|||
|
||||
return PQconnectdb(_connString.c_str());
|
||||
}
|
||||
|
||||
#endif //ZT_CONTROLLER_USE_LIBPQ
|
||||
|
|
|
@ -23,22 +23,21 @@
|
|||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
|
||||
#include "DB.hpp"
|
||||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
|
||||
#ifndef ZT_CONTROLLER_LIBPQ_HPP
|
||||
#define ZT_CONTROLLER_LIBPQ_HPP
|
||||
|
||||
#include "DB.hpp"
|
||||
|
||||
#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
|
||||
|
||||
extern "C" {
|
||||
typedef struct pg_conn PGconn;
|
||||
typedef struct pg_conn PGconn;
|
||||
}
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
namespace ZeroTier {
|
||||
|
||||
struct MQConfig;
|
||||
|
||||
|
@ -51,69 +50,70 @@ struct MQConfig;
|
|||
class PostgreSQL : public DB
|
||||
{
|
||||
public:
|
||||
PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
|
||||
virtual ~PostgreSQL();
|
||||
PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
|
||||
virtual ~PostgreSQL();
|
||||
|
||||
virtual bool waitForReady();
|
||||
virtual bool isReady();
|
||||
virtual void save(nlohmann::json *orig, nlohmann::json &record);
|
||||
virtual void eraseNetwork(const uint64_t networkId);
|
||||
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
|
||||
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
|
||||
virtual bool waitForReady();
|
||||
virtual bool isReady();
|
||||
virtual bool save(nlohmann::json &record,bool notifyListeners);
|
||||
virtual void eraseNetwork(const uint64_t networkId);
|
||||
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
|
||||
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
|
||||
|
||||
protected:
|
||||
struct _PairHasher
|
||||
struct _PairHasher
|
||||
{
|
||||
inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); }
|
||||
};
|
||||
|
||||
private:
|
||||
void initializeNetworks(PGconn *conn);
|
||||
void initializeMembers(PGconn *conn);
|
||||
void heartbeat();
|
||||
void membersDbWatcher();
|
||||
void _membersWatcher_Postgres(PGconn *conn);
|
||||
void _membersWatcher_RabbitMQ();
|
||||
void networksDbWatcher();
|
||||
void _networksWatcher_Postgres(PGconn *conn);
|
||||
void _networksWatcher_RabbitMQ();
|
||||
void initializeNetworks(PGconn *conn);
|
||||
void initializeMembers(PGconn *conn);
|
||||
void heartbeat();
|
||||
void membersDbWatcher();
|
||||
void _membersWatcher_Postgres(PGconn *conn);
|
||||
void _membersWatcher_RabbitMQ();
|
||||
void networksDbWatcher();
|
||||
void _networksWatcher_Postgres(PGconn *conn);
|
||||
void _networksWatcher_RabbitMQ();
|
||||
|
||||
void commitThread();
|
||||
void onlineNotificationThread();
|
||||
|
||||
void commitThread();
|
||||
void onlineNotificationThread();
|
||||
enum OverrideMode {
|
||||
ALLOW_PGBOUNCER_OVERRIDE = 0,
|
||||
NO_OVERRIDE = 1
|
||||
};
|
||||
|
||||
enum OverrideMode {
|
||||
ALLOW_PGBOUNCER_OVERRIDE = 0,
|
||||
NO_OVERRIDE = 1
|
||||
};
|
||||
PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
|
||||
|
||||
PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
|
||||
const Identity _myId;
|
||||
const Address _myAddress;
|
||||
std::string _myAddressStr;
|
||||
std::string _connString;
|
||||
|
||||
std::string _connString;
|
||||
BlockingQueue< std::pair<nlohmann::json,bool> > _commitQueue;
|
||||
|
||||
BlockingQueue<nlohmann::json *> _commitQueue;
|
||||
|
||||
|
||||
std::thread _heartbeatThread;
|
||||
std::thread _membersDbWatcher;
|
||||
std::thread _networksDbWatcher;
|
||||
std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
|
||||
std::thread _onlineNotificationThread;
|
||||
std::thread _heartbeatThread;
|
||||
std::thread _membersDbWatcher;
|
||||
std::thread _networksDbWatcher;
|
||||
std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
|
||||
std::thread _onlineNotificationThread;
|
||||
|
||||
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline;
|
||||
|
||||
mutable std::mutex _lastOnline_l;
|
||||
mutable std::mutex _readyLock;
|
||||
std::atomic<int> _ready, _connected, _run;
|
||||
mutable volatile bool _waitNoticePrinted;
|
||||
mutable std::mutex _lastOnline_l;
|
||||
mutable std::mutex _readyLock;
|
||||
std::atomic<int> _ready, _connected, _run;
|
||||
mutable volatile bool _waitNoticePrinted;
|
||||
|
||||
int _listenPort;
|
||||
int _listenPort;
|
||||
|
||||
MQConfig *_mqc;
|
||||
MQConfig *_mqc;
|
||||
};
|
||||
|
||||
}
|
||||
} // namespace ZeroTier
|
||||
|
||||
#endif // ZT_CONTROLLER_LIBPQ_HPP
|
||||
|
||||
#endif // ZT_CONTROLLER_USE_LIBPQ
|
||||
#endif // ZT_CONTROLLER_USE_LIBPQ
|
||||
|
|
|
@ -1,3 +1,30 @@
|
|||
/*
|
||||
* ZeroTier One - Network Virtualization Everywhere
|
||||
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* --
|
||||
*
|
||||
* You can be released from the requirements of the license by purchasing
|
||||
* a commercial license. Buying such a license is mandatory as soon as you
|
||||
* develop commercial closed-source software that incorporates or links
|
||||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
|
||||
#include "RabbitMQ.hpp"
|
||||
|
||||
#ifdef ZT_CONTROLLER_USE_LIBPQ
|
||||
|
@ -11,95 +38,95 @@ namespace ZeroTier
|
|||
{
|
||||
|
||||
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
|
||||
: _mqc(cfg)
|
||||
, _qName(queueName)
|
||||
, _socket(NULL)
|
||||
, _status(0)
|
||||
: _mqc(cfg)
|
||||
, _qName(queueName)
|
||||
, _socket(NULL)
|
||||
, _status(0)
|
||||
{
|
||||
}
|
||||
|
||||
RabbitMQ::~RabbitMQ()
|
||||
{
|
||||
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
|
||||
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
|
||||
amqp_destroy_connection(_conn);
|
||||
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
|
||||
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
|
||||
amqp_destroy_connection(_conn);
|
||||
}
|
||||
|
||||
void RabbitMQ::init()
|
||||
{
|
||||
struct timeval tval;
|
||||
memset(&tval, 0, sizeof(struct timeval));
|
||||
tval.tv_sec = 5;
|
||||
struct timeval tval;
|
||||
memset(&tval, 0, sizeof(struct timeval));
|
||||
tval.tv_sec = 5;
|
||||
|
||||
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
|
||||
_conn = amqp_new_connection();
|
||||
_socket = amqp_tcp_socket_new(_conn);
|
||||
if (!_socket) {
|
||||
throw std::runtime_error("Can't create socket for RabbitMQ");
|
||||
}
|
||||
|
||||
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
|
||||
if (_status) {
|
||||
throw std::runtime_error("Can't connect to RabbitMQ");
|
||||
}
|
||||
|
||||
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
|
||||
_mqc->username, _mqc->password);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("RabbitMQ Login Error");
|
||||
}
|
||||
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
|
||||
_conn = amqp_new_connection();
|
||||
_socket = amqp_tcp_socket_new(_conn);
|
||||
if (!_socket) {
|
||||
throw std::runtime_error("Can't create socket for RabbitMQ");
|
||||
}
|
||||
|
||||
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
|
||||
if (_status) {
|
||||
throw std::runtime_error("Can't connect to RabbitMQ");
|
||||
}
|
||||
|
||||
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
|
||||
_mqc->username, _mqc->password);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("RabbitMQ Login Error");
|
||||
}
|
||||
|
||||
static int chan = 0;
|
||||
static int chan = 0;
|
||||
{
|
||||
Mutex::Lock l(_chan_m);
|
||||
_channel = ++chan;
|
||||
_channel = ++chan;
|
||||
}
|
||||
amqp_channel_open(_conn, _channel);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error opening communication channel");
|
||||
}
|
||||
|
||||
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error declaring queue " + std::string(_qName));
|
||||
}
|
||||
amqp_channel_open(_conn, _channel);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error opening communication channel");
|
||||
}
|
||||
|
||||
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error declaring queue " + std::string(_qName));
|
||||
}
|
||||
|
||||
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error consuming queue " + std::string(_qName));
|
||||
}
|
||||
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
|
||||
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
|
||||
r = amqp_get_rpc_reply(_conn);
|
||||
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
throw std::runtime_error("Error consuming queue " + std::string(_qName));
|
||||
}
|
||||
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
|
||||
}
|
||||
|
||||
std::string RabbitMQ::consume()
|
||||
{
|
||||
amqp_rpc_reply_t res;
|
||||
amqp_envelope_t envelope;
|
||||
amqp_maybe_release_buffers(_conn);
|
||||
amqp_rpc_reply_t res;
|
||||
amqp_envelope_t envelope;
|
||||
amqp_maybe_release_buffers(_conn);
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 1;
|
||||
timeout.tv_usec = 0;
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 1;
|
||||
timeout.tv_usec = 0;
|
||||
|
||||
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
|
||||
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
|
||||
// timeout waiting for message. Return empty string
|
||||
return "";
|
||||
} else {
|
||||
throw std::runtime_error("Error getting message");
|
||||
}
|
||||
}
|
||||
res = amqp_consume_message(_conn, &envelope, &timeout, 0);
|
||||
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
|
||||
// timeout waiting for message. Return empty string
|
||||
return "";
|
||||
} else {
|
||||
throw std::runtime_error("Error getting message");
|
||||
}
|
||||
}
|
||||
|
||||
std::string msg(
|
||||
(const char*)envelope.message.body.bytes,
|
||||
envelope.message.body.len
|
||||
);
|
||||
amqp_destroy_envelope(&envelope);
|
||||
return msg;
|
||||
std::string msg(
|
||||
(const char*)envelope.message.body.bytes,
|
||||
envelope.message.body.len
|
||||
);
|
||||
amqp_destroy_envelope(&envelope);
|
||||
return msg;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,16 +23,19 @@
|
|||
* directly against ZeroTier software without disclosing the source code
|
||||
* of your own application.
|
||||
*/
|
||||
|
||||
#ifndef ZT_CONTROLLER_RABBITMQ_HPP
|
||||
#define ZT_CONTROLLER_RABBITMQ_HPP
|
||||
|
||||
#include "DB.hpp"
|
||||
|
||||
namespace ZeroTier
|
||||
{
|
||||
struct MQConfig {
|
||||
const char *host;
|
||||
int port;
|
||||
const char *username;
|
||||
const char *password;
|
||||
const char *host;
|
||||
int port;
|
||||
const char *username;
|
||||
const char *password;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -49,26 +52,25 @@ namespace ZeroTier
|
|||
|
||||
class RabbitMQ {
|
||||
public:
|
||||
RabbitMQ(MQConfig *cfg, const char *queueName);
|
||||
~RabbitMQ();
|
||||
RabbitMQ(MQConfig *cfg, const char *queueName);
|
||||
~RabbitMQ();
|
||||
|
||||
void init();
|
||||
void init();
|
||||
|
||||
std::string consume();
|
||||
std::string consume();
|
||||
|
||||
private:
|
||||
MQConfig *_mqc;
|
||||
const char *_qName;
|
||||
MQConfig *_mqc;
|
||||
const char *_qName;
|
||||
|
||||
amqp_socket_t *_socket;
|
||||
amqp_connection_state_t _conn;
|
||||
amqp_queue_declare_ok_t *_q;
|
||||
int _status;
|
||||
amqp_socket_t *_socket;
|
||||
amqp_connection_state_t _conn;
|
||||
amqp_queue_declare_ok_t *_q;
|
||||
int _status;
|
||||
|
||||
int _channel;
|
||||
int _channel;
|
||||
|
||||
Mutex _chan_m;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue