Peer metrics (#1995)

* Adding peer metrics

still need to be wired up for use

* per peer packet metrics

* Fix crash from bad instantiation of histogram

* separate alive & dead path counts

* Add peer metric update block

* add peer latency values in doPingAndKeepalive

* prevent deadlock

* peer latency histogram actually works now

* cleanup

* capture counts of packets to specific peers

---------

Co-authored-by: Joseph Henry <joseph.henry@zerotier.com>
This commit is contained in:
Grant Limberg 2023-05-04 07:58:02 -07:00 committed by GitHub
parent 925599cab0
commit 74dc41c7c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 53 deletions

View file

@ -28,11 +28,6 @@ namespace ZeroTier {
static unsigned char s_freeRandomByteCounter = 0;
char * peerIDString(const Identity &id) {
char out[16];
return id.address().toString(out);
}
Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) :
RR(renv),
_lastReceive(0),
@ -55,7 +50,13 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_directPathPushCutoffCount(0),
_echoRequestCutoffCount(0),
_localMultipathSupported(false),
_lastComputedAggregateMeanLatency(0)
_lastComputedAggregateMeanLatency(0),
_peer_latency{Metrics::peer_latency.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}}, std::vector<uint64_t>{1,3,6,10,30,60,100,300,600,1000})},
_alive_path_count{Metrics::peer_path_count.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())},{"status","alive"}})},
_dead_path_count{Metrics::peer_path_count.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())},{"status","dead"}})},
_incoming_packet{Metrics::peer_incoming_packets.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}})},
_outgoing_packet{Metrics::peer_outgoing_packets.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}})},
_packet_errors{Metrics::peer_packet_errors.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}})}
{
if (!myIdentity.agree(peerIdentity,_key)) {
throw ZT_EXCEPTION_INVALID_ARGUMENT;
@ -96,7 +97,7 @@ void Peer::received(
default:
break;
}
_incoming_packet++;
recordIncomingPacket(path, packetId, payloadLength, verb, flowId, now);
if (trustEstablished) {
@ -519,54 +520,70 @@ void Peer::performMultipathStateCheck(void *tPtr, int64_t now)
unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
{
unsigned int sent = 0;
Mutex::Lock _l(_paths_m);
{
Mutex::Lock _l(_paths_m);
performMultipathStateCheck(tPtr, now);
performMultipathStateCheck(tPtr, now);
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
if (sendFullHello) {
_lastSentFullHello = now;
}
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
// let those old links expire.
long maxPriority = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
maxPriority = std::max(_paths[i].priority,maxPriority);
} else {
break;
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
if (sendFullHello) {
_lastSentFullHello = now;
}
}
bool deletionOccurred = false;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
// Clean expired and reduced priority paths
if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) {
if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello);
_paths[i].p->sent(now);
sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
}
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
// let those old links expire.
long maxPriority = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
maxPriority = std::max(_paths[i].priority,maxPriority);
} else {
_paths[i] = _PeerPath();
deletionOccurred = true;
break;
}
}
if (!_paths[i].p || deletionOccurred) {
for(unsigned int j=i;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
if (_paths[j].p && i != j) {
_paths[i] = _paths[j];
_paths[j] = _PeerPath();
break;
bool deletionOccurred = false;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
// Clean expired and reduced priority paths
if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) {
if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello);
_paths[i].p->sent(now);
sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
}
} else {
_paths[i] = _PeerPath();
deletionOccurred = true;
}
}
deletionOccurred = false;
if (!_paths[i].p || deletionOccurred) {
for(unsigned int j=i;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
if (_paths[j].p && i != j) {
_paths[i] = _paths[j];
_paths[j] = _PeerPath();
break;
}
}
deletionOccurred = false;
}
}
uint16_t alive_path_count_tmp = 0, dead_path_count_tmp = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if (_paths[i].p->alive(now)) {
alive_path_count_tmp++;
}
else {
dead_path_count_tmp++;
}
}
}
_alive_path_count = alive_path_count_tmp;
_dead_path_count = dead_path_count_tmp;
}
_peer_latency.Observe(latency(now));
return sent;
}
@ -641,6 +658,7 @@ void Peer::resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddres
void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
{
_outgoing_packet++;
if (_localMultipathSupported && _bond) {
_bond->recordOutgoingPacket(path, packetId, payloadLength, verb, flowId, now);
}
@ -648,6 +666,7 @@ void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t pack
void Peer::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
{
_packet_errors++;
if (_localMultipathSupported && _bond) {
_bond->recordIncomingInvalidPacket(path);
}