This commit is contained in:
Adam Ierymenko 2017-06-30 17:32:07 -07:00
parent 1a40f35fd4
commit baa10c2995
12 changed files with 326 additions and 148 deletions

View file

@ -312,8 +312,8 @@ class OneServiceImpl;
static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwconf);
static void SnodeEventCallback(ZT_Node *node,void *uptr,void *tptr,enum ZT_Event event,const void *metaData);
static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,const void *data,int len);
static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen);
static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len);
static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],void *data,unsigned int maxlen);
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl);
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *remoteAddr);
@ -1220,34 +1220,20 @@ public:
res["planetWorldId"] = planet.id();
res["planetWorldTimestamp"] = planet.timestamp();
/*
#ifdef ZT_ENABLE_CLUSTER
json cj;
ZT_ClusterStatus cs;
_node->clusterStatus(&cs);
if (cs.clusterSize >= 1) {
json cja = json::array();
for(unsigned int i=0;i<cs.clusterSize;++i) {
json cjm;
cjm["id"] = (int)cs.members[i].id;
cjm["msSinceLastHeartbeat"] = cs.members[i].msSinceLastHeartbeat;
cjm["alive"] = (bool)(cs.members[i].alive != 0);
cjm["x"] = cs.members[i].x;
cjm["y"] = cs.members[i].y;
cjm["z"] = cs.members[i].z;
cjm["load"] = cs.members[i].load;
cjm["peers"] = cs.members[i].peers;
cja.push_back(cjm);
{
json cj(json::object());
Mutex::Lock _l(_tcpConnections_m);
Mutex::Lock _l2(_localConfig_m);
for(std::vector<InetAddress>::const_iterator ca(_clusterBackplaneAddresses.begin());ca!=_clusterBackplaneAddresses.end();++ca) {
uint64_t up = 0;
for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) {
if (((*c)->remoteAddr == *ca)&&((*c)->clusterMemberId)&&((*c)->lastReceive > up))
up = (*c)->lastReceive;
}
cj[ca->toString()] = up;
}
cj["members"] = cja;
cj["myId"] = (int)cs.myId;
cj["clusterSize"] = cs.clusterSize;
res["cluster"] = cj;
}
res["cluster"] = cj;
#else
res["cluster"] = json();
#endif
*/
scode = 200;
} else if (ps[0] == "moon") {
@ -1877,16 +1863,15 @@ public:
return false;
}
void replicateStateObject(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,TcpConnection *tc)
void replicateStateObject(const ZT_StateObjectType type,const uint64_t id[2],const void *const data,const unsigned int len,TcpConnection *tc)
{
char buf[34];
char buf[42];
Mutex::Lock _l2(tc->writeq_m);
if (tc->writeq.length() == 0)
_phy.setNotifyWritable(tc->sock,true);
const unsigned int mlen = len + 34;
const unsigned int mlen = len + 42;
tc->writeq.push_back((char)((mlen >> 16) & 0xff));
tc->writeq.push_back((char)((mlen >> 8) & 0xff));
@ -1895,24 +1880,32 @@ public:
Utils::getSecureRandom(buf,16);
buf[24] = (char)CLUSTER_MESSAGE_STATE_OBJECT;
buf[25] = (char)type;
buf[26] = (char)((id >> 56) & 0xff);
buf[27] = (char)((id >> 48) & 0xff);
buf[28] = (char)((id >> 40) & 0xff);
buf[29] = (char)((id >> 32) & 0xff);
buf[30] = (char)((id >> 24) & 0xff);
buf[31] = (char)((id >> 16) & 0xff);
buf[32] = (char)((id >> 8) & 0xff);
buf[33] = (char)(id & 0xff);
buf[26] = (char)((id[0] >> 56) & 0xff);
buf[27] = (char)((id[0] >> 48) & 0xff);
buf[28] = (char)((id[0] >> 40) & 0xff);
buf[29] = (char)((id[0] >> 32) & 0xff);
buf[30] = (char)((id[0] >> 24) & 0xff);
buf[31] = (char)((id[0] >> 16) & 0xff);
buf[32] = (char)((id[0] >> 8) & 0xff);
buf[33] = (char)(id[0] & 0xff);
buf[34] = (char)((id[1] >> 56) & 0xff);
buf[35] = (char)((id[1] >> 48) & 0xff);
buf[36] = (char)((id[1] >> 40) & 0xff);
buf[37] = (char)((id[1] >> 32) & 0xff);
buf[38] = (char)((id[1] >> 24) & 0xff);
buf[39] = (char)((id[1] >> 16) & 0xff);
buf[40] = (char)((id[1] >> 8) & 0xff);
buf[41] = (char)(id[1] & 0xff);
const unsigned long startpos = (unsigned long)tc->writeq.length();
tc->writeq.append(buf,34);
tc->writeq.append(buf,42);
tc->writeq.append(reinterpret_cast<const char *>(data),len);
char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos;
encryptClusterMessage(outdata,mlen);
}
void replicateStateObjectToCluster(const ZT_StateObjectType type,const uint64_t id,const void *const data,const unsigned int len,const uint64_t everyoneBut)
void replicateStateObjectToCluster(const ZT_StateObjectType type,const uint64_t id[2],const void *const data,const unsigned int len,const uint64_t everyoneBut)
{
std::vector<uint64_t> sentTo;
if (everyoneBut)
@ -1927,7 +1920,7 @@ public:
}
}
void writeStateObject(enum ZT_StateObjectType type,uint64_t id,const void *data,int len)
void writeStateObject(enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len)
{
char p[4096];
bool secure = false;
@ -1940,17 +1933,17 @@ public:
secure = true;
break;
case ZT_STATE_OBJECT_PEER_IDENTITY:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "iddb.d/%.10llx",_homePath.c_str(),(unsigned long long)id);
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "iddb.d/%.10llx",_homePath.c_str(),(unsigned long long)id[0]);
break;
case ZT_STATE_OBJECT_NETWORK_CONFIG:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "networks.d/%.16llx.conf",_homePath.c_str(),(unsigned long long)id);
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "networks.d/%.16llx.conf",_homePath.c_str(),(unsigned long long)id[0]);
secure = true;
break;
case ZT_STATE_OBJECT_PLANET:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "planet",_homePath.c_str());
break;
case ZT_STATE_OBJECT_MOON:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "moons.d/%.16llx.moon",_homePath.c_str(),(unsigned long long)id);
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "moons.d/%.16llx.moon",_homePath.c_str(),(unsigned long long)id[0]);
break;
default:
p[0] = (char)0;
@ -1985,8 +1978,12 @@ public:
if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) {
if (f->length() == 21) {
const uint64_t nwid = Utils::hexStrToU64(f->substr(0,16).c_str());
if (nwid)
replicateStateObject(ZT_STATE_OBJECT_NETWORK_CONFIG,nwid,buf.data(),(int)buf.length(),tc);
if (nwid) {
uint64_t tmp[2];
tmp[0] = nwid;
tmp[1] = 0;
replicateStateObject(ZT_STATE_OBJECT_NETWORK_CONFIG,tmp,buf.data(),(int)buf.length(),tc);
}
}
}
}
@ -1996,8 +1993,12 @@ public:
if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) {
if (f->length() == 21) {
const uint64_t moonId = Utils::hexStrToU64(f->substr(0,16).c_str());
if (moonId)
replicateStateObject(ZT_STATE_OBJECT_MOON,moonId,buf.data(),(int)buf.length(),tc);
if (moonId) {
uint64_t tmp[2];
tmp[0] = moonId;
tmp[1] = 0;
replicateStateObject(ZT_STATE_OBJECT_MOON,tmp,buf.data(),(int)buf.length(),tc);
}
}
}
}
@ -2313,8 +2314,9 @@ public:
break;
case CLUSTER_MESSAGE_STATE_OBJECT:
if (mlen >= (25 + 9)) { // type + object ID + [data]
const uint64_t objId = (
if (mlen >= 42) { // type + object ID + [data]
uint64_t objId[2];
objId[0] = (
((uint64_t)data[26] << 56) |
((uint64_t)data[27] << 48) |
((uint64_t)data[28] << 40) |
@ -2324,9 +2326,19 @@ public:
((uint64_t)data[32] << 8) |
(uint64_t)data[33]
);
if (_node->processStateUpdate((void *)0,(ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34)) == ZT_RESULT_OK) {
writeStateObject((ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34));
replicateStateObjectToCluster((ZT_StateObjectType)data[25],objId,data + 34,(unsigned int)(mlen - 34),tc->clusterMemberId);
objId[1] = (
((uint64_t)data[34] << 56) |
((uint64_t)data[35] << 48) |
((uint64_t)data[36] << 40) |
((uint64_t)data[37] << 32) |
((uint64_t)data[38] << 24) |
((uint64_t)data[39] << 16) |
((uint64_t)data[40] << 8) |
(uint64_t)data[41]
);
if (_node->processStateUpdate((void *)0,(ZT_StateObjectType)data[25],objId[0],data + 42,(unsigned int)(mlen - 42)) == ZT_RESULT_OK) {
writeStateObject((ZT_StateObjectType)data[25],objId,data + 42,(unsigned int)(mlen - 42));
replicateStateObjectToCluster((ZT_StateObjectType)data[25],objId,data + 42,(unsigned int)(mlen - 42),tc->clusterMemberId);
}
}
break;
@ -2543,13 +2555,13 @@ public:
}
}
inline void nodeStatePutFunction(enum ZT_StateObjectType type,uint64_t id,const void *data,int len)
inline void nodeStatePutFunction(enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len)
{
writeStateObject(type,id,data,len);
replicateStateObjectToCluster(type,id,data,len,0);
}
inline int nodeStateGetFunction(enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen)
inline int nodeStateGetFunction(enum ZT_StateObjectType type,const uint64_t id[2],void *data,unsigned int maxlen)
{
char p[4096];
switch(type) {
@ -2866,9 +2878,9 @@ static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,void *tptr
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkConfigFunction(nwid,nuptr,op,nwconf); }
static void SnodeEventCallback(ZT_Node *node,void *uptr,void *tptr,enum ZT_Event event,const void *metaData)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeEventCallback(event,metaData); }
static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,const void *data,int len)
static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeStatePutFunction(type,id,data,len); }
static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,uint64_t id,void *data,unsigned int maxlen)
static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],void *data,unsigned int maxlen)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeStateGetFunction(type,id,data,maxlen); }
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len,ttl); }