This commit is contained in:
Adam Ierymenko 2017-07-06 11:45:22 -07:00
parent 84748aab51
commit f18158a52d
17 changed files with 198 additions and 981 deletions

View file

@ -59,8 +59,6 @@
#include "../osdep/ManagedRoute.hpp"
#include "OneService.hpp"
#include "ClusterGeoIpService.hpp"
#include "ClusterDefinition.hpp"
#include "SoftwareUpdater.hpp"
#ifdef __WINDOWS__
@ -157,9 +155,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
// Maximum write buffer size for outgoing TCP connections (sanity limit)
#define ZT_TCP_MAX_WRITEQ_SIZE 33554432
// How often to check TCP connections and cluster links and send status to cluster peers
#define ZT_TCP_CHECK_PERIOD 15000
// TCP activity timeout
#define ZT_TCP_ACTIVITY_TIMEOUT 60000
@ -311,9 +306,9 @@ static int SnodeVirtualNetworkConfigFunction(ZT_Node *node,void *uptr,void *tptr
static void SnodeEventCallback(ZT_Node *node,void *uptr,void *tptr,enum ZT_Event event,const void *metaData);
static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len);
static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],void *data,unsigned int maxlen);
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl);
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,int64_t localSocket,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl);
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *remoteAddr);
static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,int64_t localSocket,const struct sockaddr_storage *remoteAddr);
static int SnodePathLookupFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,int family,struct sockaddr_storage *result);
static void StapFrameHandler(void *uptr,void *tptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
@ -362,8 +357,7 @@ struct TcpConnection
TCP_UNCATEGORIZED_INCOMING, // uncategorized incoming connection
TCP_HTTP_INCOMING,
TCP_HTTP_OUTGOING,
TCP_TUNNEL_OUTGOING, // TUNNELED mode proxy outbound connection
TCP_CLUSTER_BACKPLANE
TCP_TUNNEL_OUTGOING // TUNNELED mode proxy outbound connection
} type;
OneServiceImpl *parent;
@ -380,29 +374,11 @@ struct TcpConnection
std::string status;
std::map< std::string,std::string > headers;
// Used for cluster backplane connections
uint64_t clusterMemberId;
unsigned int clusterMemberVersionMajor;
unsigned int clusterMemberVersionMinor;
unsigned int clusterMemberVersionRev;
std::vector< InetAddress > clusterMemberLocalAddresses;
Mutex clusterMemberLocalAddresses_m;
std::string readq;
std::string writeq;
Mutex writeq_m;
};
/**
* Message types for cluster backplane communication
*/
enum ClusterMessageType
{
CLUSTER_MESSAGE_STATUS = 0,
CLUSTER_MESSAGE_STATE_OBJECT = 1,
CLUSTER_MESSAGE_PROXY_SEND = 2
};
class OneServiceImpl : public OneService
{
public:
@ -421,8 +397,6 @@ public:
bool _updateAutoApply;
unsigned int _primaryPort;
volatile unsigned int _udpPortPickerCounter;
uint64_t _clusterMemberId;
uint8_t _clusterKey[32]; // secret key for cluster backplane config
// Local configuration and memo-ized information from it
json _localConfig;
@ -434,7 +408,6 @@ public:
std::vector< InetAddress > _globalV6Blacklist;
std::vector< InetAddress > _allowManagementFrom;
std::vector< std::string > _interfacePrefixBlacklist;
std::vector< InetAddress > _clusterBackplaneAddresses;
Mutex _localConfig_m;
/*
@ -518,7 +491,6 @@ public:
,_updateAutoApply(false)
,_primaryPort(port)
,_udpPortPickerCounter(0)
,_clusterMemberId(0)
,_lastDirectReceiveFromGlobal(0)
#ifdef ZT_TCP_FALLBACK_RELAY
,_lastSendToGlobalV4(0)
@ -754,23 +726,6 @@ public:
}
}
// Derive the cluster's shared secret backplane encryption key by hashing its shared secret identity
{
uint8_t tmp[64];
uint8_t sk[ZT_C25519_PRIVATE_KEY_LEN + 4];
memcpy(sk,_node->identity().privateKeyPair().priv.data,ZT_C25519_PRIVATE_KEY_LEN);
sk[ZT_C25519_PRIVATE_KEY_LEN] = 0xab;
sk[ZT_C25519_PRIVATE_KEY_LEN + 1] = 0xcd;
sk[ZT_C25519_PRIVATE_KEY_LEN + 2] = 0xef;
sk[ZT_C25519_PRIVATE_KEY_LEN + 3] = 0xab; // add an arbitrary nonce, just because
SHA512::hash(tmp,sk,ZT_C25519_PRIVATE_KEY_LEN + 4);
memcpy(_clusterKey,tmp,32);
}
// Assign a random non-zero cluster member ID to identify vs. other cluster members
Utils::getSecureRandom(&_clusterMemberId,sizeof(_clusterMemberId));
if (!_clusterMemberId) _clusterMemberId = 1;
// Main I/O loop
_nextBackgroundTaskDeadline = 0;
uint64_t clockShouldBe = OSUtils::now();
@ -779,7 +734,6 @@ public:
uint64_t lastBindRefresh = 0;
uint64_t lastUpdateCheck = clockShouldBe;
uint64_t lastLocalInterfaceAddressCheck = (clockShouldBe - ZT_LOCAL_INTERFACE_CHECK_INTERVAL) + 15000; // do this in 15s to give portmapper time to configure and other things time to settle
uint64_t lastTcpCheck = 0;
for(;;) {
_run_m.lock();
if (!_run) {
@ -873,58 +827,6 @@ public:
_node->addLocalInterfaceAddress(reinterpret_cast<const struct sockaddr_storage *>(&(*i)));
}
// Check TCP connections and cluster links
if ((now - lastTcpCheck) >= ZT_TCP_CHECK_PERIOD) {
lastTcpCheck = now;
// Send status to active cluster links and close overflowed and dead ones
std::vector<PhySocket *> toClose;
std::vector<InetAddress> clusterLinksUp;
{
Mutex::Lock _l(_tcpConnections_m);
for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) {
TcpConnection *const tc = *c;
tc->writeq_m.lock();
const unsigned long wql = (unsigned long)tc->writeq.length();
tc->writeq_m.unlock();
if ((tc->sock)&&((wql > ZT_TCP_MAX_WRITEQ_SIZE)||((now - tc->lastReceive) > ZT_TCP_ACTIVITY_TIMEOUT))) {
toClose.push_back(tc->sock);
} else if ((tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(tc->clusterMemberId)) {
clusterLinksUp.push_back(tc->remoteAddr);
sendMyCurrentClusterState(tc);
}
}
}
for(std::vector<PhySocket *>::iterator s(toClose.begin());s!=toClose.end();++s)
_phy.close(*s,true);
// Attempt to connect to cluster links we don't have an active connection to
{
Mutex::Lock _l(_localConfig_m);
for(std::vector<InetAddress>::const_iterator ca(_clusterBackplaneAddresses.begin());ca!=_clusterBackplaneAddresses.end();++ca) {
if ( (std::find(clusterLinksUp.begin(),clusterLinksUp.end(),*ca) == clusterLinksUp.end()) && (!_binder.isBoundLocalInterfaceAddress(*ca)) ) {
TcpConnection *tc = new TcpConnection();
{
Mutex::Lock _l(_tcpConnections_m);
_tcpConnections.push_back(tc);
}
tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE;
tc->remoteAddr = *ca;
tc->lastReceive = OSUtils::now();
tc->parent = this;
tc->sock = (PhySocket *)0; // set in connect handler
tc->messageSize = 0;
tc->clusterMemberId = 0; // not known yet
bool connected = false;
_phy.tcpConnect(reinterpret_cast<const struct sockaddr *>(&(*ca)),connected,(void *)tc,true);
}
}
}
}
const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100;
clockShouldBe = now + (uint64_t)delay;
_phy.poll(delay);
@ -1211,21 +1113,6 @@ public:
res["planetWorldId"] = planet.id();
res["planetWorldTimestamp"] = planet.timestamp();
{
json cj(json::object());
Mutex::Lock _l(_tcpConnections_m);
Mutex::Lock _l2(_localConfig_m);
for(std::vector<InetAddress>::const_iterator ca(_clusterBackplaneAddresses.begin());ca!=_clusterBackplaneAddresses.end();++ca) {
uint64_t up = 0;
for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) {
if (((*c)->remoteAddr == *ca)&&((*c)->clusterMemberId)&&((*c)->lastReceive > up))
up = (*c)->lastReceive;
}
cj[ca->toString()] = up;
}
res["cluster"] = cj;
}
scode = 200;
} else if (ps[0] == "moon") {
std::vector<World> moons(_node->moons());
@ -1576,16 +1463,6 @@ public:
}
}
json &cl = settings["cluster"];
_clusterBackplaneAddresses.clear();
if (cl.is_array()) {
for(unsigned long i=0;i<cl.size();++i) {
const InetAddress cip(OSUtils::jsonString(cl[i],""));
if ((cip.ss_family == AF_INET)||(cip.ss_family == AF_INET6))
_clusterBackplaneAddresses.push_back(cip);
}
}
json &controllerDbHttpHost = settings["controllerDbHttpHost"];
json &controllerDbHttpPort = settings["controllerDbHttpPort"];
json &controllerDbHttpPath = settings["controllerDbHttpPath"];
@ -1754,250 +1631,6 @@ public:
}
}
// =========================================================================
// Cluster messaging functions
// =========================================================================
// mlen must be at least 24
void encryptClusterMessage(char *data,unsigned int mlen)
{
uint8_t key[32];
memcpy(key,_clusterKey,32);
for(int i=0;i<8;++i) key[i] ^= data[i];
Salsa20 s20(key,data + 8);
uint8_t macKey[32];
uint8_t mac[16];
memset(macKey,0,32);
s20.crypt12(macKey,macKey,32);
s20.crypt12(data + 24,data + 24,mlen - 24);
Poly1305::compute(mac,data + 24,mlen - 24,macKey);
memcpy(data + 16,mac,8);
}
void announceStatusToClusterMember(TcpConnection *tc)
{
try {
Buffer<8194> buf;
buf.appendRandom(16);
buf.addSize(8); // space for MAC
buf.append((uint8_t)CLUSTER_MESSAGE_STATUS);
buf.append(_clusterMemberId);
buf.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR);
buf.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR);
buf.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
std::vector<InetAddress> lif(_binder.allBoundLocalInterfaceAddresses());
buf.append((uint16_t)lif.size());
for(std::vector<InetAddress>::const_iterator i(lif.begin());i!=lif.end();++i)
i->serialize(buf);
Mutex::Lock _l(tc->writeq_m);
if (tc->writeq.length() == 0)
_phy.setNotifyWritable(tc->sock,true);
const unsigned int mlen = buf.size();
tc->writeq.push_back((char)((mlen >> 16) & 0xff));
tc->writeq.push_back((char)((mlen >> 8) & 0xff));
tc->writeq.push_back((char)(mlen & 0xff));
char *const data = reinterpret_cast<char *>(buf.unsafeData());
encryptClusterMessage(data,mlen);
tc->writeq.append(data,mlen);
} catch ( ... ) {
fprintf(stderr,"WARNING: unexpected exception announcing status to cluster members" ZT_EOL_S);
}
}
bool proxySendViaCluster(const InetAddress &fromAddress,const InetAddress &dest,const void *data,unsigned int len,unsigned int ttl)
{
Mutex::Lock _l(_tcpConnections_m);
for(std::vector<TcpConnection *>::const_iterator c(_tcpConnections.begin());c!=_tcpConnections.end();++c) {
TcpConnection *const tc = *c;
if ((tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(tc->clusterMemberId)) {
Mutex::Lock _l2(tc->clusterMemberLocalAddresses_m);
for(std::vector<InetAddress>::const_iterator i(tc->clusterMemberLocalAddresses.begin());i!=tc->clusterMemberLocalAddresses.end();++i) {
if (*i == fromAddress) {
Buffer<1024> buf;
buf.appendRandom(16);
buf.addSize(8); // space for MAC
buf.append((uint8_t)CLUSTER_MESSAGE_PROXY_SEND);
buf.append((uint8_t)ttl);
dest.serialize(buf);
fromAddress.serialize(buf);
Mutex::Lock _l3(tc->writeq_m);
if (tc->writeq.length() == 0)
_phy.setNotifyWritable(tc->sock,true);
const unsigned int mlen = buf.size() + len;
tc->writeq.push_back((char)((mlen >> 16) & 0xff));
tc->writeq.push_back((char)((mlen >> 8) & 0xff));
tc->writeq.push_back((char)(mlen & 0xff));
const unsigned long startpos = (unsigned long)tc->writeq.length();
tc->writeq.append(reinterpret_cast<const char *>(buf.data()),buf.size());
tc->writeq.append(reinterpret_cast<const char *>(data),len);
char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos;
encryptClusterMessage(outdata,mlen);
return true;
}
}
}
}
return false;
}
void replicateStateObject(const ZT_StateObjectType type,const uint64_t id[2],const void *const data,const unsigned int len,TcpConnection *tc)
{
char buf[42];
Mutex::Lock _l2(tc->writeq_m);
if (tc->writeq.length() == 0)
_phy.setNotifyWritable(tc->sock,true);
const unsigned int mlen = len + 42;
tc->writeq.push_back((char)((mlen >> 16) & 0xff));
tc->writeq.push_back((char)((mlen >> 8) & 0xff));
tc->writeq.push_back((char)(mlen & 0xff));
Utils::getSecureRandom(buf,16);
buf[24] = (char)CLUSTER_MESSAGE_STATE_OBJECT;
buf[25] = (char)type;
buf[26] = (char)((id[0] >> 56) & 0xff);
buf[27] = (char)((id[0] >> 48) & 0xff);
buf[28] = (char)((id[0] >> 40) & 0xff);
buf[29] = (char)((id[0] >> 32) & 0xff);
buf[30] = (char)((id[0] >> 24) & 0xff);
buf[31] = (char)((id[0] >> 16) & 0xff);
buf[32] = (char)((id[0] >> 8) & 0xff);
buf[33] = (char)(id[0] & 0xff);
buf[34] = (char)((id[1] >> 56) & 0xff);
buf[35] = (char)((id[1] >> 48) & 0xff);
buf[36] = (char)((id[1] >> 40) & 0xff);
buf[37] = (char)((id[1] >> 32) & 0xff);
buf[38] = (char)((id[1] >> 24) & 0xff);
buf[39] = (char)((id[1] >> 16) & 0xff);
buf[40] = (char)((id[1] >> 8) & 0xff);
buf[41] = (char)(id[1] & 0xff);
const unsigned long startpos = (unsigned long)tc->writeq.length();
tc->writeq.append(buf,42);
tc->writeq.append(reinterpret_cast<const char *>(data),len);
char *const outdata = const_cast<char *>(tc->writeq.data()) + startpos;
encryptClusterMessage(outdata,mlen);
tc->writeq.append(outdata,mlen);
}
void writeStateObject(enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len)
{
char buf[65535];
char p[1024];
FILE *f;
bool secure = false;
switch(type) {
case ZT_STATE_OBJECT_IDENTITY_PUBLIC:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.public",_homePath.c_str());
break;
case ZT_STATE_OBJECT_IDENTITY_SECRET:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.secret",_homePath.c_str());
secure = true;
break;
//case ZT_STATE_OBJECT_PEER_STATE:
// break;
case ZT_STATE_OBJECT_NETWORK_CONFIG:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "networks.d/%.16llx.conf",_homePath.c_str(),(unsigned long long)id[0]);
secure = true;
break;
//case ZT_STATE_OBJECT_NETWORK_MEMBERSHIP:
// break;
case ZT_STATE_OBJECT_PLANET:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "planet",_homePath.c_str());
break;
case ZT_STATE_OBJECT_MOON:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "moons.d/%.16llx.moon",_homePath.c_str(),(unsigned long long)id[0]);
break;
default:
p[0] = (char)0;
break;
}
if (p[0]) {
if (len >= 0) {
// Check to see if we've already written this first. This reduces
// redundant writes and I/O overhead on most platforms and has
// little effect on others.
f = fopen(p,"r");
bool redundant = false;
if (f) {
long l = (long)fread(buf,1,sizeof(buf),f);
fclose(f);
redundant = ((l == (long)len)&&(memcmp(data,buf,l) == 0));
}
if (!redundant) {
f = fopen(p,"w");
if (f) {
if (fwrite(data,len,1,f) != 1)
fprintf(stderr,"WARNING: unable to write to file: %s (I/O error)" ZT_EOL_S,p);
fclose(f);
if (secure)
OSUtils::lockDownFile(p,false);
} else {
fprintf(stderr,"WARNING: unable to write to file: %s (unable to open)" ZT_EOL_S,p);
}
}
} else {
OSUtils::rm(p);
}
}
}
void sendMyCurrentClusterState(TcpConnection *tc)
{
// We currently don't need to dump everything. Networks and moons are most important.
// The rest will get caught up rapidly due to constant peer updates, etc.
std::string buf;
std::vector<std::string> l(OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S + "networks.d").c_str(),false));
for(std::vector<std::string>::const_iterator f(l.begin());f!=l.end();++f) {
buf.clear();
if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) {
if (f->length() == 21) {
const uint64_t nwid = Utils::hexStrToU64(f->substr(0,16).c_str());
if (nwid) {
uint64_t tmp[2];
tmp[0] = nwid;
tmp[1] = 0;
replicateStateObject(ZT_STATE_OBJECT_NETWORK_CONFIG,tmp,buf.data(),(int)buf.length(),tc);
}
}
}
}
l = OSUtils::listDirectory((_homePath + ZT_PATH_SEPARATOR_S + "moons.d").c_str(),false);
for(std::vector<std::string>::const_iterator f(l.begin());f!=l.end();++f) {
buf.clear();
if (OSUtils::readFile((_homePath + ZT_PATH_SEPARATOR_S + *f).c_str(),buf)) {
if (f->length() == 21) {
const uint64_t moonId = Utils::hexStrToU64(f->substr(0,16).c_str());
if (moonId) {
uint64_t tmp[2];
tmp[0] = moonId;
tmp[1] = 0;
replicateStateObject(ZT_STATE_OBJECT_MOON,tmp,buf.data(),(int)buf.length(),tc);
}
}
}
}
}
// =========================================================================
// Handlers for Node and Phy<> callbacks
// =========================================================================
@ -2010,7 +1643,7 @@ public:
const ZT_ResultCode rc = _node->processWirePacket(
(void *)0,
OSUtils::now(),
reinterpret_cast<const struct sockaddr_storage *>(localAddr),
(int64_t)((uintptr_t)sock),
(const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big
data,
len,
@ -2044,13 +1677,6 @@ public:
_phy.close(_tcpFallbackTunnel->sock);
_tcpFallbackTunnel = tc;
_phy.streamSend(sock,ZT_TCP_TUNNEL_HELLO,sizeof(ZT_TCP_TUNNEL_HELLO));
} else if (tc->type == TcpConnection::TCP_CLUSTER_BACKPLANE) {
{
Mutex::Lock _l(tc->writeq_m);
tc->writeq.push_back((char)0x93); // identifies type of connection as cluster backplane
}
announceStatusToClusterMember(tc);
_phy.setNotifyWritable(sock,true);
} else {
_phy.close(sock,true);
}
@ -2106,31 +1732,6 @@ public:
case TcpConnection::TCP_UNCATEGORIZED_INCOMING:
switch(reinterpret_cast<uint8_t *>(data)[0]) {
// 0x93 is first byte of cluster backplane connections
case 0x93: {
// We only allow this from cluster backplane IPs. We also authenticate
// each packet cryptographically, so this is just a first line of defense.
bool allow = false;
{
Mutex::Lock _l(_localConfig_m);
for(std::vector< InetAddress >::const_iterator i(_clusterBackplaneAddresses.begin());i!=_clusterBackplaneAddresses.end();++i) {
if (tc->remoteAddr.ipsEqual(*i)) {
allow = true;
break;
}
}
}
if (allow) {
tc->type = TcpConnection::TCP_CLUSTER_BACKPLANE;
tc->clusterMemberId = 0; // unknown, waiting for first status message
announceStatusToClusterMember(tc);
if (len > 1)
phyOnTcpData(sock,uptr,reinterpret_cast<uint8_t *>(data) + 1,len - 1);
} else {
_phy.close(sock);
}
} break;
// HTTP: GET, PUT, POST, HEAD
case 'G':
case 'P':
@ -2223,7 +1824,7 @@ public:
const ZT_ResultCode rc = _node->processWirePacket(
(void *)0,
OSUtils::now(),
reinterpret_cast<struct sockaddr_storage *>(&fakeTcpLocalInterfaceAddress),
-1,
reinterpret_cast<struct sockaddr_storage *>(&from),
data,
plen,
@ -2248,114 +1849,6 @@ public:
}
return;
case TcpConnection::TCP_CLUSTER_BACKPLANE:
tc->readq.append((const char *)data,len);
if (tc->readq.length() >= 28) { // got 3-byte message size + 16-byte IV + 8-byte MAC + 1-byte type (encrypted)
uint8_t *data = reinterpret_cast<uint8_t *>(const_cast<char *>(tc->readq.data()));
unsigned long mlen = ( ((unsigned long)data[0] << 16) | ((unsigned long)data[1] << 8) | (unsigned long)data[2] );
if ((mlen < 25)||(mlen > ZT_TCP_MAX_WRITEQ_SIZE)) {
_phy.close(sock);
return;
} else if (tc->readq.length() >= (mlen + 3)) { // got entire message
data += 3;
uint8_t key[32];
memcpy(key,_clusterKey,32);
for(int i=0;i<8;++i) key[i] ^= data[i]; // first 8 bytes of IV get XORed with key
Salsa20 s20(key,data + 8); // last 8 bytes of IV are fed into Salsa20 directly as its 64-bit IV
uint8_t macKey[32];
uint8_t mac[16];
memset(macKey,0,32);
s20.crypt12(macKey,macKey,32);
Poly1305::compute(mac,data + 24,mlen - 24,macKey);
if (!Utils::secureEq(mac,data + 16,8)) {
_phy.close(sock);
return;
}
s20.crypt12(data + 24,data + 24,mlen - 24);
switch((ClusterMessageType)data[24]) {
case CLUSTER_MESSAGE_STATUS:
if (mlen > (25 + 16)) {
Buffer<4096> tmp(data + 25,mlen - 25);
try {
const uint64_t cmid = tmp.at<uint64_t>(0);
if (cmid == _clusterMemberId) { // shouldn't happen, but don't allow self-to-self
_phy.close(sock);
return;
}
if (!tc->clusterMemberId) {
tc->clusterMemberId = cmid;
sendMyCurrentClusterState(tc);
}
tc->clusterMemberVersionMajor = tmp.at<uint16_t>(8);
tc->clusterMemberVersionMinor = tmp.at<uint16_t>(10);
tc->clusterMemberVersionRev = tmp.at<uint16_t>(12);
const unsigned int clusterMemberLocalAddressCount = tmp.at<uint16_t>(14);
std::vector<InetAddress> la;
unsigned int ptr = 16;
for(unsigned int k=0;k<clusterMemberLocalAddressCount;++k) {
la.push_back(InetAddress());
ptr += la.back().deserialize(tmp,ptr);
}
{
Mutex::Lock _l2(tc->clusterMemberLocalAddresses_m);
tc->clusterMemberLocalAddresses.swap(la);
}
} catch ( ... ) {}
}
break;
case CLUSTER_MESSAGE_STATE_OBJECT:
if (mlen > 42) { // type + object ID + [data]
uint64_t objId[2];
objId[0] = (
((uint64_t)data[26] << 56) |
((uint64_t)data[27] << 48) |
((uint64_t)data[28] << 40) |
((uint64_t)data[29] << 32) |
((uint64_t)data[30] << 24) |
((uint64_t)data[31] << 16) |
((uint64_t)data[32] << 8) |
(uint64_t)data[33]
);
objId[1] = (
((uint64_t)data[34] << 56) |
((uint64_t)data[35] << 48) |
((uint64_t)data[36] << 40) |
((uint64_t)data[37] << 32) |
((uint64_t)data[38] << 24) |
((uint64_t)data[39] << 16) |
((uint64_t)data[40] << 8) |
(uint64_t)data[41]
);
if (_node->processStateUpdate((void *)0,(ZT_StateObjectType)data[25],objId,data + 42,(unsigned int)(mlen - 42)) == ZT_RESULT_OK)
writeStateObject((ZT_StateObjectType)data[25],objId,data + 42,(unsigned int)(mlen - 42));
}
break;
case CLUSTER_MESSAGE_PROXY_SEND:
if (mlen > 25) {
Buffer<4096> tmp(data + 25,mlen - 25);
try {
InetAddress dest,src;
const unsigned int ttl = (unsigned int)tmp[0];
unsigned int ptr = 1;
ptr += dest.deserialize(tmp);
ptr += src.deserialize(tmp,ptr);
if (ptr < tmp.size())
_binder.udpSend(_phy,src,dest,reinterpret_cast<const uint8_t *>(tmp.data()) + ptr,tmp.size() - ptr,ttl);
} catch ( ... ) {}
}
break;
}
tc->readq.erase(tc->readq.begin(),tc->readq.begin() + mlen);
}
}
return;
}
} catch ( ... ) {
_phy.close(sock);
@ -2549,18 +2042,57 @@ public:
inline void nodeStatePutFunction(enum ZT_StateObjectType type,const uint64_t id[2],const void *data,int len)
{
writeStateObject(type,id,data,len);
char p[1024];
FILE *f;
bool secure = false;
std::vector<uint64_t> sentTo;
{
Mutex::Lock _l(_tcpConnections_m);
for(std::vector<TcpConnection *>::const_iterator ci(_tcpConnections.begin());ci!=_tcpConnections.end();++ci) {
TcpConnection *const c = *ci;
if ((c->type == TcpConnection::TCP_CLUSTER_BACKPLANE)&&(c->clusterMemberId != 0)&&(std::find(sentTo.begin(),sentTo.end(),c->clusterMemberId) == sentTo.end())) {
sentTo.push_back(c->clusterMemberId);
replicateStateObject(type,id,data,len,c);
}
switch(type) {
case ZT_STATE_OBJECT_IDENTITY_PUBLIC:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.public",_homePath.c_str());
break;
case ZT_STATE_OBJECT_IDENTITY_SECRET:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "identity.secret",_homePath.c_str());
secure = true;
break;
case ZT_STATE_OBJECT_PLANET:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "planet",_homePath.c_str());
break;
case ZT_STATE_OBJECT_MOON:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "moons.d/%.16llx.moon",_homePath.c_str(),(unsigned long long)id[0]);
break;
case ZT_STATE_OBJECT_NETWORK_CONFIG:
Utils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "networks.d/%.16llx.conf",_homePath.c_str(),(unsigned long long)id[0]);
secure = true;
break;
default:
return;
}
if (len >= 0) {
// Check to see if we've already written this first. This reduces
// redundant writes and I/O overhead on most platforms and has
// little effect on others.
f = fopen(p,"r");
if (f) {
char buf[65535];
long l = (long)fread(buf,1,sizeof(buf),f);
fclose(f);
if ((l == (long)len)&&(memcmp(data,buf,l) == 0))
return;
}
f = fopen(p,"w");
if (f) {
if (fwrite(data,len,1,f) != 1)
fprintf(stderr,"WARNING: unable to write to file: %s (I/O error)" ZT_EOL_S,p);
fclose(f);
if (secure)
OSUtils::lockDownFile(p,false);
} else {
fprintf(stderr,"WARNING: unable to write to file: %s (unable to open)" ZT_EOL_S,p);
}
} else {
OSUtils::rm(p);
}
}
@ -2596,7 +2128,7 @@ public:
return -1;
}
inline int nodeWirePacketSendFunction(const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
inline int nodeWirePacketSendFunction(const int64_t localSocket,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{
#ifdef ZT_TCP_FALLBACK_RELAY
if (addr->ss_family == AF_INET) {
@ -2646,20 +2178,13 @@ public:
// proxy fallback, which is slow.
#endif // ZT_TCP_FALLBACK_RELAY
switch (_binder.udpSend(_phy,*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) {
case -1: // local bound address not found, so see if a cluster peer owns it
if (localAddr->ss_family != 0) {
return (proxySendViaCluster(*(reinterpret_cast<const InetAddress *>(localAddr)),*(reinterpret_cast<const InetAddress *>(addr)),data,len,ttl)) ? 0 : -1;
} else {
return -1; // failure
}
break;
case 0: // failure
return -1;
default: // success
return 0;
if ((localSocket != 0)&&(localSocket != -1)) {
if ((ttl)&&(addr->ss_family == AF_INET)) _phy.setIp4UdpTtl((PhySocket *)((uintptr_t)localSocket),ttl);
const bool r = _phy.udpSend((PhySocket *)((uintptr_t)localSocket),(const struct sockaddr *)addr,data,len);
if ((ttl)&&(addr->ss_family == AF_INET)) _phy.setIp4UdpTtl((PhySocket *)((uintptr_t)localSocket),255);
return ((r) ? 0 : -1);
} else {
return ((_binder.udpSendAll(_phy,addr,data,len,ttl)) ? 0 : -1);
}
}
@ -2671,7 +2196,7 @@ public:
n->tap->put(MAC(sourceMac),MAC(destMac),etherType,data,len);
}
inline int nodePathCheckFunction(uint64_t ztaddr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *remoteAddr)
inline int nodePathCheckFunction(uint64_t ztaddr,const int64_t localSocket,const struct sockaddr_storage *remoteAddr)
{
// Make sure we're not trying to do ZeroTier-over-ZeroTier
{
@ -2882,12 +2407,12 @@ static void SnodeStatePutFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_St
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeStatePutFunction(type,id,data,len); }
static int SnodeStateGetFunction(ZT_Node *node,void *uptr,void *tptr,enum ZT_StateObjectType type,const uint64_t id[2],void *data,unsigned int maxlen)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeStateGetFunction(type,id,data,maxlen); }
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localAddr,addr,data,len,ttl); }
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,void *tptr,int64_t localSocket,const struct sockaddr_storage *addr,const void *data,unsigned int len,unsigned int ttl)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodeWirePacketSendFunction(localSocket,addr,data,len,ttl); }
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,nuptr,sourceMac,destMac,etherType,vlanId,data,len); }
static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *remoteAddr)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodePathCheckFunction(ztaddr,localAddr,remoteAddr); }
static int SnodePathCheckFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,int64_t localSocket,const struct sockaddr_storage *remoteAddr)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodePathCheckFunction(ztaddr,localSocket,remoteAddr); }
static int SnodePathLookupFunction(ZT_Node *node,void *uptr,void *tptr,uint64_t ztaddr,int family,struct sockaddr_storage *result)
{ return reinterpret_cast<OneServiceImpl *>(uptr)->nodePathLookupFunction(ztaddr,family,result); }
static void StapFrameHandler(void *uptr,void *tptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)