Improved multipath link monitoring

This commit is contained in:
Joseph Henry 2022-09-20 14:27:34 -07:00
parent 0797adf223
commit bc521504ca
No known key found for this signature in database
GPG key ID: C45B33FF5EBC9344
9 changed files with 232 additions and 255 deletions

View file

@ -28,6 +28,8 @@ uint8_t Bond::_defaultPolicy = ZT_BOND_POLICY_NONE;
Phy<Bond*>* Bond::_phy;
Binder* Bond::_binder;
Mutex Bond::_bonds_m;
Mutex Bond::_links_m;
@ -158,13 +160,13 @@ void Bond::destroyBond(uint64_t peerId)
SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded = false)
{
Mutex::Lock _l(_links_m);
char ifname[64] = { 0 };
_phy->getIfName((PhySocket*)((uintptr_t)localSocket), ifname, sizeof(ifname) - 1);
char ifname[ZT_MAX_PHYSIFNAME] = {};
_binder->getIfName((PhySocket*)((uintptr_t)localSocket), ifname, sizeof(ifname) - 1);
std::string ifnameStr(ifname);
auto search = _interfaceToLinkMap[policyAlias].find(ifnameStr);
if (search == _interfaceToLinkMap[policyAlias].end()) {
if (createIfNeeded) {
SharedPtr<Link> s = new Link(ifnameStr, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, "", 0.0);
SharedPtr<Link> s = new Link(ifnameStr, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, "");
_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
return s;
}
@ -250,6 +252,12 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
}
}
if (! alreadyPresent) {
SharedPtr<Link> link = getLink(path);
if (link) {
std::string ifnameStr = std::string(link->ifname());
memset(path->_ifname, 0x0, ZT_MAX_PHYSIFNAME);
memcpy(path->_ifname, ifnameStr.c_str(), std::min((int)ifnameStr.length(), ZT_MAX_PHYSIFNAME));
}
/**
* Find somewhere to stick it
*/
@ -523,6 +531,7 @@ int32_t Bond::generateQoSPacket(int pathIdx, int64_t now, char* qosBuffer)
std::map<uint64_t, uint64_t>::iterator it = _paths[pathIdx].qosStatsIn.begin();
int i = 0;
int numRecords = std::min(_paths[pathIdx].packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE);
debug("numRecords=%3d, packetsReceivedSinceLastQoS=%3d, _paths[pathIdx].qosStatsIn.size()=%3lu", numRecords, _paths[pathIdx].packetsReceivedSinceLastQoS, _paths[pathIdx].qosStatsIn.size());
while (i < numRecords && it != _paths[pathIdx].qosStatsIn.end()) {
uint64_t id = it->first;
memcpy(qosBuffer, &id, sizeof(uint64_t));
@ -800,8 +809,8 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT);
char qosData[ZT_QOS_MAX_PACKET_SIZE];
int16_t len = generateQoSPacket(pathIdx, _now, qosData);
debug("sending QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len);
if (len) {
debug("sending QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len);
outp.append(qosData, len);
if (atAddress) {
outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
@ -905,6 +914,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
SharedPtr<Link> link = getLink(_paths[i].p);
if (! link) {
log("link is no longer valid, removing from bond");
_paths[i].p->_valid = false;
_paths[i] = NominatedPath();
_paths[i].p = SharedPtr<Path>();
continue;
@ -1109,6 +1119,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
if (_policy == ZT_BOND_POLICY_BALANCE_RR) {
// Cause a RR reset since the current index might no longer be valid
_rrPacketsSentOnCurrLink = _packetsPerLink;
_rrIdx = 0;
}
}
}
@ -1166,9 +1177,13 @@ void Bond::estimatePathQuality(int64_t now)
_paths[i].p->_packetLossRatio = _paths[i].packetLossRatio;
_paths[i].p->_packetErrorRatio = _paths[i].packetErrorRatio;
_paths[i].p->_bonded = _paths[i].bonded;
_paths[i].p->_givenLinkSpeed = 0;//_paths[i].givenLinkSpeed;
_paths[i].p->_eligible = _paths[i].eligible;
// _valid is written elsewhere
_paths[i].p->_allocation = _paths[i].allocation;
SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
if (link) {
_paths[i].p->_givenLinkSpeed = link->speed();
}
//_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0);
// Drain unacknowledged QoS records
@ -1725,10 +1740,11 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
_policy = (policy <= ZT_BOND_POLICY_NONE || policy > ZT_BOND_POLICY_BALANCE_AWARE) ? _defaultPolicy : policy;
// Check if non-leaf to prevent spamming infrastructure
ZT_PeerRole role;
if (_peer) {
ZT_PeerRole role = RR->topology->role(_peer->address());
_isLeaf = (role != ZT_PEER_ROLE_PLANET && role != ZT_PEER_ROLE_MOON);
role = RR->topology->role(_peer->address());
}
_isLeaf = _peer ? (role != ZT_PEER_ROLE_PLANET && role != ZT_PEER_ROLE_MOON) : false;
// Flows

View file

@ -14,6 +14,7 @@
#ifndef ZT_BOND_HPP
#define ZT_BOND_HPP
#include "../osdep/Binder.hpp"
#include "../osdep/Phy.hpp"
#include "Packet.hpp"
#include "Path.hpp"
@ -122,7 +123,7 @@ class Link {
* @param failoverToLinkStr
* @param userSpecifiedAlloc
*/
Link(std::string ifnameStr, uint8_t ipvPref, uint32_t speed, bool enabled, uint8_t mode, std::string failoverToLinkStr, float userSpecifiedAlloc)
Link(std::string ifnameStr, uint8_t ipvPref, uint32_t speed, bool enabled, uint8_t mode, std::string failoverToLinkStr)
: _ifnameStr(ifnameStr)
, _ipvPref(ipvPref)
, _speed(speed)
@ -130,7 +131,6 @@ class Link {
, _enabled(enabled)
, _mode(mode)
, _failoverToLinkStr(failoverToLinkStr)
, _userSpecifiedAlloc(userSpecifiedAlloc)
, _isUserSpecified(false)
{
}
@ -287,11 +287,6 @@ class Link {
*/
std::string _failoverToLinkStr;
/**
* User-specified allocation
*/
float _userSpecifiedAlloc;
/**
* Whether or not this link was created as a result of manual user specification. This is
* important to know because certain policy decisions are dependent on whether the user
@ -328,6 +323,14 @@ class Bond {
return ! _bondPolicyTemplates.empty() || _defaultPolicy;
}
/**
* Sets a pointer to an instance of _binder used by the Bond to get interface data
*/
static void setBinder(Binder* b)
{
_binder = b;
}
/**
* @param basePolicyName Bonding policy name (See ZeroTierOne.h)
* @return The bonding policy code for a given human-readable bonding policy name
@ -461,7 +464,7 @@ class Bond {
* @param createIfNeeded Whether a Link object is created if the name wasn't previously in the link map
* @return Physical link definition
*/
static SharedPtr<Link> getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded);
SharedPtr<Link> getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded);
/**
* Gets a reference to a physical link definition given its human-readable system name.
@ -840,14 +843,6 @@ class Bond {
_maxAcceptablePacketErrorRatio = errorRatio;
}
/**
* @param errorRatio Maximum acceptable packet error ratio (PER).
*/
void setMinAcceptableAllocation(float minAlloc)
{
_minAcceptableAllocation = (uint8_t)(minAlloc * 255);
}
/**
* @return Whether the user has defined links for use on this bond
*/
@ -970,14 +965,6 @@ class Bond {
return _failoverInterval;
}
/**
* @param strategy Strategy that the bond uses to re-assign protocol flows.
*/
inline void setFlowRebalanceStrategy(uint32_t strategy)
{
_flowRebalanceStrategy = strategy;
}
/**
* @param strategy Strategy that the bond uses to prob for path aliveness and quality
*/
@ -1150,26 +1137,8 @@ class Bond {
*/
bool abForciblyRotateLink();
/**
* @param now Current time
* @return All known paths to this peer
*/
inline std::vector<SharedPtr<Path> > paths(const int64_t now) const
{
std::vector<SharedPtr<Path> > pp;
Mutex::Lock _l(_paths_m);
for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
if (! _paths[i].p) {
continue;
}
pp.push_back(_paths[i].p);
}
return pp;
}
/**
* Emit message to tracing system but with added timestamp and subsystem info
*
*/
void log(const char* fmt, ...)
#ifdef __GNUC__
@ -1201,7 +1170,6 @@ class Bond {
/**
* Emit message to tracing system but with added timestamp and subsystem info
*
*/
void debug(const char* fmt, ...)
#ifdef __GNUC__
@ -1412,7 +1380,6 @@ class Bond {
{
p = path;
whenNominated = now;
p->_bondingMetricPtr = (void*)this;
}
};
@ -1487,6 +1454,8 @@ class Bond {
std::string _policyAlias; // Custom name given by the user to this bond type.
static Binder* _binder;
/**
* Set of indices corresponding to paths currently included in the bond proper. This
* may only be updated during a call to curateBond(). The reason for this is so that
@ -1518,7 +1487,6 @@ class Bond {
// balance-aware
uint64_t _totalBondUnderload;
uint8_t _flowRebalanceStrategy;
// dynamic link monitoring
uint8_t _linkMonitorStrategy;
@ -1546,7 +1514,6 @@ class Bond {
uint16_t _maxAcceptableLatency;
uint16_t _maxAcceptableMeanLatency;
uint16_t _maxAcceptablePacketDelayVariance;
uint8_t _minAcceptableAllocation;
/**
* Link state reporting

View file

@ -496,15 +496,30 @@ ZT_PeerList *Node::peers() const
SharedPtr<Path> bestp(pi->second->getAppropriatePath(_now,false));
p->pathCount = 0;
for(std::vector< SharedPtr<Path> >::iterator path(paths.begin());path!=paths.end();++path) {
memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage));
p->paths[p->pathCount].localSocket = (*path)->localSocket();
p->paths[p->pathCount].lastSend = (*path)->lastOut();
p->paths[p->pathCount].lastReceive = (*path)->lastIn();
p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());
p->paths[p->pathCount].expired = 0;
p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0;
p->paths[p->pathCount].scope = (*path)->ipScope();
++p->pathCount;
if((*path)->valid()) {
memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage));
p->paths[p->pathCount].localSocket = (*path)->localSocket();
p->paths[p->pathCount].lastSend = (*path)->lastOut();
p->paths[p->pathCount].lastReceive = (*path)->lastIn();
p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());
p->paths[p->pathCount].expired = 0;
p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0;
p->paths[p->pathCount].scope = (*path)->ipScope();
if (pi->second->bond()) {
p->paths[p->pathCount].latencyMean = (*path)->latencyMean();
p->paths[p->pathCount].latencyVariance = (*path)->latencyVariance();
p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio();
p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio();
p->paths[p->pathCount].allocation = (*path)->allocation();
p->paths[p->pathCount].linkSpeed = (*path)->givenLinkSpeed();
p->paths[p->pathCount].bonded = (*path)->bonded();
p->paths[p->pathCount].eligible = (*path)->eligible();
std::string ifname = std::string((*path)->ifname());
memset(p->paths[p->pathCount].ifname, 0x0, std::min((int)ifname.length() + 1, ZT_MAX_PHYSIFNAME));
memcpy(p->paths[p->pathCount].ifname, ifname.c_str(), std::min((int)ifname.length(), ZT_MAX_PHYSIFNAME));
}
++p->pathCount;
}
}
if (pi->second->bond()) {
p->isBonded = pi->second->bond();

View file

@ -85,6 +85,15 @@ public:
_lastTrustEstablishedPacketReceived(0),
_lastEchoRequestReceived(0),
_localSocket(-1),
_latencyMean(0.0),
_latencyVariance(0.0),
_packetLossRatio(0.0),
_packetErrorRatio(0.0),
_valid(true),
_eligible(false),
_bonded(false),
_givenLinkSpeed(0),
_allocation(0),
_latency(0xffff),
_addr(),
_ipScope(InetAddress::IP_SCOPE_NONE)
@ -96,6 +105,15 @@ public:
_lastTrustEstablishedPacketReceived(0),
_lastEchoRequestReceived(0),
_localSocket(localSocket),
_latencyMean(0.0),
_latencyVariance(0.0),
_packetLossRatio(0.0),
_packetErrorRatio(0.0),
_valid(true),
_eligible(false),
_bonded(false),
_givenLinkSpeed(0),
_allocation(0),
_latency(0xffff),
_addr(addr),
_ipScope(addr.ipScope())
@ -300,6 +318,17 @@ public:
*/
inline unsigned int packetErrorRatio() const { return _packetErrorRatio; }
/**
* @return Whether this path is valid as reported by the bonding layer. The bonding layer
* actually checks with Phy to see if the interface is still up
*/
inline unsigned int valid() const { return _valid; }
/**
* @return Whether this path is eligible for use in a bond as reported by the bonding layer
*/
inline unsigned int eligible() const { return _eligible; }
/**
* @return Whether this path is bonded as reported by the bonding layer
*/
@ -313,27 +342,36 @@ public:
/**
* @return Traffic allocation as reported by the bonding layer
*/
inline unsigned int allocation() const { return _allocation; }
inline unsigned char allocation() const { return _allocation; }
void *_bondingMetricPtr;
/**
* @return Physical interface name that this path lives on
*/
char *ifname() {
return _ifname;
}
private:
char _ifname[ZT_MAX_PHYSIFNAME] = { };
volatile int64_t _lastOut;
volatile int64_t _lastIn;
volatile int64_t _lastTrustEstablishedPacketReceived;
int64_t _lastEchoRequestReceived;
int64_t _localSocket;
volatile float _latencyMean;
volatile float _latencyVariance;
volatile float _packetLossRatio;
volatile float _packetErrorRatio;
volatile bool _valid;
volatile bool _eligible;
volatile bool _bonded;
volatile int64_t _givenLinkSpeed;
volatile int8_t _allocation;
int64_t _lastEchoRequestReceived;
int64_t _localSocket;
volatile uint32_t _givenLinkSpeed;
volatile uint8_t _allocation;
volatile unsigned int _latency;
InetAddress _addr;