Work in progress...

This commit is contained in:
Adam Ierymenko 2019-08-22 13:06:08 -07:00
parent 0e18b14087
commit b66431bc29
No known key found for this signature in database
GPG key ID: 1657198823E52A61
15 changed files with 356 additions and 390 deletions

View file

@ -58,7 +58,7 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
_uPtr(uptr),
_networks(8),
_now(now),
_lastPingCheck(0),
_lastPing(0),
_lastHousekeepingRun(0),
_lastMemoizedTraceSettings(0)
{
@ -184,101 +184,91 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
} else return ZT_RESULT_ERROR_NETWORK_NOT_FOUND;
}
/*
// Function object used to traverse the peer list, check peer status, and ping
// those that need pinging.
struct _PingPeersThatNeedPing
struct _processBackgroundTasks_ping_eachRoot
{
inline _PingPeersThatNeedPing(const RuntimeEnvironment *renv,void *tPtr,Hashtable< Address,std::vector<InetAddress> > &alwaysContact,int64_t now) :
RR(renv),
_tPtr(tPtr),
_alwaysContact(alwaysContact),
_now(now),
_bestCurrentUpstream(RR->topology->getUpstreamPeer()),
online(false)
Hashtable< void *,bool > roots;
int64_t now;
void *tPtr;
bool online;
inline void operator()(const Root &root,const SharedPtr<Peer> &peer)
{
unsigned int v4SendCount = 0,v6SendCount = 0;
peer->ping(tPtr,now,v4SendCount,v6SendCount);
const InetAddress *contactAddrs[2];
unsigned int contactAddrCount = 0;
if (v4SendCount == 0) {
if (*(contactAddrs[contactAddrCount] = &(root.pickPhysical(AF_INET))))
++contactAddrCount;
}
if (v6SendCount == 0) {
if (*(contactAddrs[contactAddrCount] = &(root.pickPhysical(AF_INET6))))
++contactAddrCount;
}
for(unsigned int i=0;i<contactAddrCount;++i)
peer->sendHELLO(tPtr,-1,*contactAddrs[i],now);
if (!online)
online = ((now - peer->lastReceive()) <= ((ZT_PEER_PING_PERIOD * 2) + 5000));
roots.set((void *)peer.ptr(),true);
}
};
inline void operator()(Topology &t,const SharedPtr<Peer> &p)
struct _processBackgroundTasks_ping_eachPeer
{
int64_t now;
void *tPtr;
Hashtable< void *,bool > *roots;
inline void operator()(const SharedPtr<Peer> &peer)
{
const std::vector<InetAddress> *const alwaysContactEndpoints = _alwaysContact.get(p->address());
if (alwaysContactEndpoints) {
online |= p->isAlive(_now);
const unsigned int sent = p->doPingAndKeepalive(_tPtr,_now);
bool contacted = (sent != 0);
if ((sent & 0x1) == 0) { // bit 0x1 == IPv4 sent
for(unsigned long k=0,ptr=(unsigned long)Utils::random();k<(unsigned long)alwaysContactEndpoints->size();++k) {
const InetAddress &addr = (*alwaysContactEndpoints)[ptr++ % alwaysContactEndpoints->size()];
if (addr.ss_family == AF_INET) {
p->sendHELLO(_tPtr,-1,addr,_now);
contacted = true;
break;
}
}
}
if ((sent & 0x2) == 0) { // bit 0x2 == IPv6 sent
for(unsigned long k=0,ptr=(unsigned long)Utils::random();k<(unsigned long)alwaysContactEndpoints->size();++k) {
const InetAddress &addr = (*alwaysContactEndpoints)[ptr++ % alwaysContactEndpoints->size()];
if (addr.ss_family == AF_INET6) {
p->sendHELLO(_tPtr,-1,addr,_now);
contacted = true;
break;
}
}
}
if ((!contacted)&&(_bestCurrentUpstream)) {
const SharedPtr<Path> up(_bestCurrentUpstream->getAppropriatePath(_now,true));
if (up)
p->sendHELLO(_tPtr,up->localSocket(),up->address(),_now);
}
_alwaysContact.erase(p->address()); // after this we'll WHOIS all upstreams that remain
} else if (p->isActive(_now)) {
p->doPingAndKeepalive(_tPtr,_now);
if (!roots->contains((void *)peer.ptr())) {
unsigned int v4SendCount = 0,v6SendCount = 0;
peer->ping(tPtr,now,v4SendCount,v6SendCount);
}
}
const RuntimeEnvironment *RR;
void *_tPtr;
Hashtable< Address,std::vector<InetAddress> > &_alwaysContact;
const int64_t _now;
const SharedPtr<Peer> _bestCurrentUpstream;
bool online;
};
*/
ZT_ResultCode Node::processBackgroundTasks(void *tptr,int64_t now,volatile int64_t *nextBackgroundTaskDeadline)
{
_now = now;
Mutex::Lock bl(_backgroundTasksLock);
unsigned long timeUntilNextPingCheck = ZT_PING_CHECK_INVERVAL;
const int64_t timeSinceLastPingCheck = now - _lastPingCheck;
unsigned long timeUntilNextPing = ZT_PEER_PING_PERIOD;
const int64_t timeSinceLastPing = now - _lastPing;
if (timeSinceLastPing >= ZT_PEER_PING_PERIOD) {
_lastPing = now;
try {
_processBackgroundTasks_ping_eachRoot rf;
rf.now = now;
rf.tPtr = tptr;
rf.online = false;
RR->topology->eachRoot(rf);
_processBackgroundTasks_ping_eachPeer pf;
pf.now = now;
pf.tPtr = tptr;
pf.roots = &rf.roots;
RR->topology->eachPeer(pf);
if (rf.online != _online) {
_online = rf.online;
postEvent(tptr,_online ? ZT_EVENT_ONLINE : ZT_EVENT_OFFLINE);
}
} catch ( ... ) {
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
}
/*
if (timeSinceLastPingCheck >= ZT_PING_CHECK_INVERVAL) {
try {
_lastPingCheck = now;
// Clean up any old local controller auth memoizations. This is an
// optimization for network controllers to know whether to accept
// or trust nodes without doing an extra cert check.
{
_localControllerAuthorizations_m.lock();
Hashtable< _LocalControllerAuth,int64_t >::Iterator i(_localControllerAuthorizations);
_LocalControllerAuth *k = (_LocalControllerAuth *)0;
int64_t *v = (int64_t *)0;
while (i.next(k,v)) {
if ((*v - now) > (ZT_NETWORK_AUTOCONF_DELAY * 3)) {
_localControllerAuthorizations.erase(*k);
}
}
_localControllerAuthorizations_m.unlock();
}
/*
// (1) Get peers we should remain connected to and (2) get networks that need config.
Hashtable< Address,std::vector<InetAddress> > alwaysContact;
RR->topology->getAlwaysContact(alwaysContact);
@ -319,13 +309,13 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,int64_t now,volatile int64
_online = pfunc.online;
if (oldOnline != _online)
postEvent(tptr,_online ? ZT_EVENT_ONLINE : ZT_EVENT_OFFLINE);
*/
} catch ( ... ) {
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
} else {
timeUntilNextPingCheck -= (unsigned long)timeSinceLastPingCheck;
}
*/
if ((now - _lastMemoizedTraceSettings) >= (ZT_HOUSEKEEPING_PERIOD / 4)) {
_lastMemoizedTraceSettings = now;
@ -335,6 +325,22 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,int64_t now,volatile int64
if ((now - _lastHousekeepingRun) >= ZT_HOUSEKEEPING_PERIOD) {
_lastHousekeepingRun = now;
try {
// Clean up any old local controller auth memoizations. This is an
// optimization for network controllers to know whether to accept
// or trust nodes without doing an extra cert check.
{
_localControllerAuthorizations_m.lock();
Hashtable< _LocalControllerAuth,int64_t >::Iterator i(_localControllerAuthorizations);
_LocalControllerAuth *k = (_LocalControllerAuth *)0;
int64_t *v = (int64_t *)0;
while (i.next(k,v)) {
if ((*v - now) > (ZT_NETWORK_AUTOCONF_DELAY * 3)) {
_localControllerAuthorizations.erase(*k);
}
}
_localControllerAuthorizations_m.unlock();
}
RR->topology->doPeriodicTasks(now);
RR->sa->clean(now);
RR->mc->clean(now);
@ -344,7 +350,7 @@ ZT_ResultCode Node::processBackgroundTasks(void *tptr,int64_t now,volatile int64
}
try {
*nextBackgroundTaskDeadline = now + (int64_t)std::max(std::min(timeUntilNextPingCheck,RR->sw->doTimerTasks(tptr,now)),(unsigned long)ZT_CORE_TIMER_TASK_GRANULARITY);
*nextBackgroundTaskDeadline = now + (int64_t)std::max(std::min(timeUntilNextPing,RR->sw->doTimerTasks(tptr,now)),(unsigned long)ZT_CORE_TIMER_TASK_GRANULARITY);
} catch ( ... ) {
return ZT_RESULT_FATAL_ERROR_INTERNAL;
}
@ -527,9 +533,9 @@ void Node::freeQueryResult(void *qr)
int Node::addLocalInterfaceAddress(const struct sockaddr_storage *addr)
{
if (Path::isAddressValidForPath(*(reinterpret_cast<const InetAddress *>(addr)))) {
Mutex::Lock _l(_directPaths_m);
if (std::find(_directPaths.begin(),_directPaths.end(),*(reinterpret_cast<const InetAddress *>(addr))) == _directPaths.end()) {
_directPaths.push_back(*(reinterpret_cast<const InetAddress *>(addr)));
Mutex::Lock _l(_localInterfaceAddresses_m);
if (std::find(_localInterfaceAddresses.begin(),_localInterfaceAddresses.end(),*(reinterpret_cast<const InetAddress *>(addr))) == _localInterfaceAddresses.end()) {
_localInterfaceAddresses.push_back(*(reinterpret_cast<const InetAddress *>(addr)));
return 1;
}
}
@ -538,8 +544,8 @@ int Node::addLocalInterfaceAddress(const struct sockaddr_storage *addr)
void Node::clearLocalInterfaceAddresses()
{
Mutex::Lock _l(_directPaths_m);
_directPaths.clear();
Mutex::Lock _l(_localInterfaceAddresses_m);
_localInterfaceAddresses.clear();
}
int Node::sendUserMessage(void *tptr,uint64_t dest,uint64_t typeId,const void *data,unsigned int len)