More clustering work.

This commit is contained in:
Adam Ierymenko 2017-06-01 12:33:05 -07:00
parent 76452b4e28
commit 6015b529a0
6 changed files with 323 additions and 185 deletions

View file

@ -33,6 +33,7 @@
#include "../version.h"
#include "Constants.hpp"
#include "SharedPtr.hpp"
#include "Node.hpp"
#include "RuntimeEnvironment.hpp"
#include "NetworkController.hpp"
@ -45,6 +46,7 @@
#include "Identity.hpp"
#include "SelfAwareness.hpp"
#include "Cluster.hpp"
#include "Network.hpp"
const struct sockaddr_storage ZT_SOCKADDR_NULL = {0};
@ -58,6 +60,7 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,uint6
_RR(this),
RR(&_RR),
_uPtr(uptr),
_networks(8),
_now(now),
_lastPingCheck(0),
_lastHousekeepingRun(0)
@ -74,20 +77,31 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,uint6
memset(_expectingRepliesTo,0,sizeof(_expectingRepliesTo));
memset(_lastIdentityVerification,0,sizeof(_lastIdentityVerification));
std::string idtmp(dataStoreGet(tptr,"identity.secret"));
if ((!idtmp.length())||(!RR->identity.fromString(idtmp))||(!RR->identity.hasPrivate())) {
TRACE("identity.secret not found, generating...");
RR->identity.generate();
idtmp = RR->identity.toString(true);
if (!dataStorePut(tptr,"identity.secret",idtmp,true))
throw std::runtime_error("unable to write identity.secret");
char tmp[512];
std::string tmp2;
int n = stateObjectGet(tptr,ZT_STATE_OBJECT_IDENTITY_SECRET,0,tmp,sizeof(tmp) - 1);
if (n > 0) {
tmp[n] = (char)0;
if (!RR->identity.fromString(tmp))
n = -1;
}
RR->publicIdentityStr = RR->identity.toString(false);
RR->secretIdentityStr = RR->identity.toString(true);
idtmp = dataStoreGet(tptr,"identity.public");
if (idtmp != RR->publicIdentityStr) {
if (!dataStorePut(tptr,"identity.public",RR->publicIdentityStr,false))
throw std::runtime_error("unable to write identity.public");
if (n <= 0) {
RR->identity.generate();
tmp2 = RR->identity.toString(true);
stateObjectPut(tptr,ZT_STATE_OBJECT_IDENTITY_SECRET,RR->identity.address().toInt(),tmp2.data(),(unsigned int)tmp2.length());
tmp2 = RR->identity.toString(false);
stateObjectPut(tptr,ZT_STATE_OBJECT_IDENTITY_PUBLIC,RR->identity.address().toInt(),tmp2.data(),(unsigned int)tmp2.length());
} else {
n = stateObjectGet(tptr,ZT_STATE_OBJECT_IDENTITY_PUBLIC,RR->identity.address().toInt(),tmp,sizeof(tmp) - 1);
if (n > 0) {
tmp[n] = (char)0;
if (RR->identity.toString(false) != tmp)
n = -1;
}
if (n <= 0) {
tmp2 = RR->identity.toString(false);
stateObjectPut(tptr,ZT_STATE_OBJECT_IDENTITY_PUBLIC,RR->identity.address().toInt(),tmp2.data(),(unsigned int)tmp2.length());
}
}
try {
@ -110,7 +124,7 @@ Node::~Node()
{
Mutex::Lock _l(_networks_m);
_networks.clear(); // ensure that networks are destroyed before shutdow
_networks.clear(); // destroy all networks before shutdown
delete RR->sa;
delete RR->topology;
@ -122,6 +136,88 @@ Node::~Node()
#endif
}
ZT_ResultCode Node::processStateUpdate(
void *tptr,
ZT_StateObjectType type,
uint64_t id,
const void *data,
unsigned int len)
{
ZT_ResultCode r = ZT_RESULT_OK_IGNORED;
switch(type) {
case ZT_STATE_OBJECT_PEER: {
} break;
case ZT_STATE_OBJECT_NETWORK_CONFIG:
if (len <= (ZT_NETWORKCONFIG_DICT_CAPACITY - 1)) {
if (len < 2) {
Mutex::Lock _l(_networks_m);
SharedPtr<Network> &nw = _networks[id];
if (!nw)
nw = SharedPtr<Network>(new Network(RR,tptr,id,(void *)0,(const NetworkConfig *)0));
} else {
Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> *dict = new Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY>(reinterpret_cast<const char *>(data),len);
try {
NetworkConfig *nconf = new NetworkConfig();
try {
if (nconf->fromDictionary(*dict)) {
Mutex::Lock _l(_networks_m);
SharedPtr<Network> &nw = _networks[id];
if (nw) {
switch (nw->setConfiguration(tptr,*nconf,false)) {
default:
r = ZT_RESULT_ERROR_BAD_PARAMETER;
break;
case 1:
r = ZT_RESULT_OK_IGNORED;
break;
case 2:
r = ZT_RESULT_OK;
break;
}
} else {
nw = SharedPtr<Network>(new Network(RR,tptr,id,(void *)0,nconf));
}
} else {
r = ZT_RESULT_ERROR_BAD_PARAMETER;
}
} catch ( ... ) {
r = ZT_RESULT_ERROR_BAD_PARAMETER;
}
delete nconf;
} catch ( ... ) {
r = ZT_RESULT_ERROR_BAD_PARAMETER;
}
delete dict;
}
} else {
r = ZT_RESULT_ERROR_BAD_PARAMETER;
}
break;
case ZT_STATE_OBJECT_PLANET:
case ZT_STATE_OBJECT_MOON:
if (len <= ZT_WORLD_MAX_SERIALIZED_LENGTH) {
World w;
try {
w.deserialize(Buffer<ZT_WORLD_MAX_SERIALIZED_LENGTH>(data,len));
if (( (w.type() == World::TYPE_MOON)&&(type == ZT_STATE_OBJECT_MOON) )||( (w.type() == World::TYPE_PLANET)&&(type == ZT_STATE_OBJECT_PLANET) )) {
r = (RR->topology->addWorld(tptr,w,false)) ? ZT_RESULT_OK : ZT_RESULT_OK_IGNORED;
}
} catch ( ... ) {
r = ZT_RESULT_ERROR_BAD_PARAMETER;
}
} else {
r = ZT_RESULT_ERROR_BAD_PARAMETER;
}
break;
default: break;
}
return r;
}
ZT_ResultCode Node::processWirePacket(
void *tptr,
uint64_t now,
@ -311,7 +407,7 @@ ZT_ResultCode Node::join(uint64_t nwid,void *uptr,void *tptr)
Mutex::Lock _l(_networks_m);
SharedPtr<Network> &nw = _networks[nwid];
if (!nw)
nw = SharedPtr<Network>(new Network(RR,tptr,nwid,uptr));
nw = SharedPtr<Network>(new Network(RR,tptr,nwid,uptr,(const NetworkConfig *)0));
return ZT_RESULT_OK;
}
@ -319,7 +415,6 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
{
ZT_VirtualNetworkConfig ctmp;
void **nUserPtr = (void **)0;
{
Mutex::Lock _l(_networks_m);
SharedPtr<Network> *nw = _networks.get(nwid);
@ -330,12 +425,18 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
(*nw)->externalConfig(&ctmp);
(*nw)->destroy();
nUserPtr = (*nw)->userPtr();
_networks.erase(nwid);
}
if (nUserPtr)
RR->node->configureVirtualNetworkPort(tptr,nwid,nUserPtr,ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY,&ctmp);
{
Mutex::Lock _l(_networks_m);
_networks.erase(nwid);
}
RR->node->stateObjectDelete(tptr,ZT_STATE_OBJECT_NETWORK_CONFIG,nwid);
return ZT_RESULT_OK;
}
@ -579,20 +680,6 @@ void Node::clusterStatus(ZT_ClusterStatus *cs)
/* Node methods used only within node/ */
/****************************************************************************/
std::string Node::dataStoreGet(void *tPtr,const char *name)
{
char buf[1024];
std::string r;
unsigned long olen = 0;
do {
long n = _cb.dataStoreGetFunction(reinterpret_cast<ZT_Node *>(this),_uPtr,tPtr,name,buf,sizeof(buf),(unsigned long)r.length(),&olen);
if (n <= 0)
return std::string();
r.append(buf,n);
} while (r.length() < olen);
return r;
}
bool Node::shouldUsePathForZeroTierTraffic(void *tPtr,const Address &ztaddr,const InetAddress &localAddress,const InetAddress &remoteAddress)
{
if (!Path::isAddressValidForPath(remoteAddress))
@ -813,6 +900,23 @@ void ZT_Node_delete(ZT_Node *node)
} catch ( ... ) {}
}
enum ZT_ResultCode ZT_Node_processStateUpdate(
ZT_Node *node,
void *tptr,
ZT_StateObjectType type,
uint64_t id,
const void *data,
unsigned int len)
{
try {
return reinterpret_cast<ZeroTier::Node *>(node)->processStateUpdate(tptr,type,id,data,len);
} catch (std::bad_alloc &exc) {
return ZT_RESULT_FATAL_ERROR_OUT_OF_MEMORY;
} catch ( ... ) {
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
}
enum ZT_ResultCode ZT_Node_processWirePacket(
ZT_Node *node,
void *tptr,