RethinkDB direct connectivity integration.

This commit is contained in:
Adam Ierymenko 2017-11-03 11:39:27 -07:00
parent 4e88c80a22
commit f5014d7d71
4 changed files with 477 additions and 254 deletions

View file

@ -19,6 +19,8 @@
#ifndef ZT_SQLITENETWORKCONTROLLER_HPP
#define ZT_SQLITENETWORKCONTROLLER_HPP
#define ZT_CONTROLLER_USE_RETHINKDB
#include <stdint.h>
#include <string>
@ -30,9 +32,7 @@
#include <unordered_map>
#include "../node/Constants.hpp"
#include "../node/NetworkController.hpp"
#include "../node/Mutex.hpp"
#include "../node/Utils.hpp"
#include "../node/Address.hpp"
#include "../node/InetAddress.hpp"
@ -44,18 +44,23 @@
#include "../ext/json/json.hpp"
#include "JSONDB.hpp"
#ifdef ZT_CONTROLLER_USE_RETHINKDB
#include "RethinkDB.hpp"
#endif
namespace ZeroTier {
#ifdef ZT_CONTROLLER_USE_RETHINKDB
typedef RethinkDB ControllerDB;
#endif
class Node;
class EmbeddedNetworkController : public NetworkController,NonCopyable
class EmbeddedNetworkController : public NetworkController
{
public:
/**
* @param node Parent node
* @param dbPath Path to store data
* @param dbPath Database path (file path or database credentials)
*/
EmbeddedNetworkController(Node *node,const char *dbPath);
virtual ~EmbeddedNetworkController();
@ -98,22 +103,7 @@ public:
void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId);
void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
void threadMain()
throw();
private:
struct _RQEntry
{
uint64_t nwid;
uint64_t requestPacketId;
InetAddress fromAddr;
Identity identity;
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData;
enum {
RQENTRY_TYPE_REQUEST = 0
} type;
};
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
void _startThreads();
@ -166,12 +156,20 @@ private:
}
network["objtype"] = "network";
}
inline void _addNetworkNonPersistedFields(nlohmann::json &network,int64_t now,const JSONDB::NetworkSummaryInfo &ns)
inline void _addNetworkNonPersistedFields(const uint64_t nwid,nlohmann::json &network,int64_t now,const ControllerDB::NetworkSummaryInfo &ns)
{
network["clock"] = now;
network["authorizedMemberCount"] = ns.authorizedMemberCount;
network["activeMemberCount"] = ns.activeMemberCount;
network["totalMemberCount"] = ns.totalMemberCount;
{
std::lock_guard<std::mutex> l(_memberStatus_l);
unsigned long ac = 0;
for(auto ms=_memberStatus.begin();ms!=_memberStatus.end();++ms) {
if ((ms->first.networkId == nwid)&&(ms->second.online(now)))
++ac;
}
network["activeMemberCount"] = ac;
}
}
inline void _removeNetworkNonPersistedFields(nlohmann::json &network)
{
@ -185,8 +183,10 @@ private:
inline void _addMemberNonPersistedFields(uint64_t nwid,uint64_t nodeId,nlohmann::json &member,int64_t now)
{
member["clock"] = now;
Mutex::Lock _l(_memberStatus_m);
member["online"] = _memberStatus[_MemberStatusKey(nwid,nodeId)].online(now);
{
std::lock_guard<std::mutex> l(_memberStatus_l);
member["online"] = _memberStatus[_MemberStatusKey(nwid,nodeId)].online(now);
}
}
inline void _removeMemberNonPersistedFields(nlohmann::json &member)
{
@ -197,23 +197,17 @@ private:
member.erase("lastRequestMetaData");
}
const int64_t _startTime;
volatile bool _running;
BlockingQueue<_RQEntry *> _queue;
std::vector<Thread> _threads;
volatile uint64_t _lastDumpedStatus;
Mutex _threads_m;
JSONDB _db;
Node *const _node;
std::string _path;
NetworkController::Sender *_sender;
Identity _signingId;
std::string _signingIdAddressString;
struct _RQEntry
{
uint64_t nwid;
uint64_t requestPacketId;
InetAddress fromAddr;
Identity identity;
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData;
enum {
RQENTRY_TYPE_REQUEST = 0
} type;
};
struct _MemberStatusKey
{
_MemberStatusKey() : networkId(0),nodeId(0) {}
@ -239,8 +233,19 @@ private:
return (std::size_t)(networkIdNodeId.networkId + networkIdNodeId.nodeId);
}
};
const int64_t _startTime;
Node *const _node;
std::string _path;
Identity _signingId;
std::string _signingIdAddressString;
NetworkController::Sender *_sender;
ControllerDB _db;
BlockingQueue< std::unique_ptr<_RQEntry> > _queue;
std::vector<std::thread> _threads;
std::mutex _threads_l;
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
Mutex _memberStatus_m;
std::mutex _memberStatus_l;
};
} // namespace ZeroTier