Port multipath improvements to newer version
This commit is contained in:
parent
024649c175
commit
2e6cda38f6
9 changed files with 120 additions and 47 deletions
|
@ -15,10 +15,10 @@
|
|||
|
||||
#include "Switch.hpp"
|
||||
|
||||
#include <cinttypes> // for PRId64, etc. macros
|
||||
#include <cmath>
|
||||
#include <cstdio>
|
||||
#include <string>
|
||||
#include <cinttypes> // for PRId64, etc. macros
|
||||
|
||||
// FIXME: remove this suppression and actually fix warnings
|
||||
#ifdef __GNUC__
|
||||
|
@ -108,7 +108,7 @@ bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::s
|
|||
std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin();
|
||||
bool found = false;
|
||||
while (bondItr != _bonds.end()) {
|
||||
if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) {
|
||||
if (bondItr->second->setMtuByTuple(mtu, ifStr, ipStr)) {
|
||||
found = true;
|
||||
}
|
||||
++bondItr;
|
||||
|
@ -154,11 +154,13 @@ SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr
|
|||
bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer);
|
||||
bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str());
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) {
|
||||
bond = new Bond(renv, _defaultPolicy, peer);
|
||||
bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str());
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
|
||||
bond->debug("new default bond");
|
||||
}
|
||||
|
@ -227,10 +229,12 @@ SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l
|
|||
SharedPtr<Link> s = new Link(ifnameStr, 0, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, "");
|
||||
_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
|
||||
return s;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
return SharedPtr<Link>();
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
return search->second;
|
||||
}
|
||||
}
|
||||
|
@ -340,6 +344,7 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
|
|||
_paths[i].ipvPref = sl->ipvPref();
|
||||
_paths[i].mode = sl->mode();
|
||||
_paths[i].enabled = sl->enabled();
|
||||
_paths[i].localPort = _phy->getLocalPort((PhySocket*)((uintptr_t)path->localSocket()));
|
||||
_paths[i].onlyPathOnLink = ! bFoundCommonLink;
|
||||
}
|
||||
}
|
||||
|
@ -397,7 +402,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
|
|||
_rrPacketsSentOnCurrLink = 0;
|
||||
if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) {
|
||||
_rrIdx = 0;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
int _tempIdx = _rrIdx;
|
||||
for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) {
|
||||
_tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1;
|
||||
|
@ -427,7 +433,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
|
|||
if (likely(it != _flows.end())) {
|
||||
it->second->lastActivity = now;
|
||||
return _paths[it->second->assignedPath].p;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
unsigned char entropy;
|
||||
Utils::getSecureRandom(&entropy, 1);
|
||||
SharedPtr<Flow> flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now);
|
||||
|
@ -505,7 +512,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
|
|||
_paths[pathIdx].qosStatsIn[packetId] = now;
|
||||
++(_paths[pathIdx].packetsReceivedSinceLastQoS);
|
||||
//_paths[pathIdx].packetValiditySamples.push(true);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
// debug("QoS buffer full, will not record information");
|
||||
}
|
||||
/*
|
||||
|
@ -532,7 +540,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
|
|||
SharedPtr<Flow> flow;
|
||||
if (! _flows.count(flowId)) {
|
||||
flow = createFlow(pathIdx, flowId, 0, now);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
flow = _flows[flowId];
|
||||
}
|
||||
if (flow) {
|
||||
|
@ -618,7 +627,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
|
|||
|
||||
if (reassign) {
|
||||
log("attempting to re-assign out-flow %04x previously on idx %d (%u / %zu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size());
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
debug("attempting to assign flow for the first time");
|
||||
}
|
||||
|
||||
|
@ -632,7 +642,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
|
|||
|
||||
if (reassign) {
|
||||
bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths)));
|
||||
}
|
||||
// debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity);
|
||||
|
@ -655,7 +666,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
|
|||
flow->assignPath(_realIdxMap[bondedIdx], now);
|
||||
++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount);
|
||||
// debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
// We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option
|
||||
flow->assignPath(_realIdxMap[nextBestQualIdx], now);
|
||||
++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount);
|
||||
|
@ -715,11 +727,13 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
|
|||
debug("forget flow %04x (age %" PRId64 ") (%u / %zu)", it->first, it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1));
|
||||
_paths[it->second->assignedPath].assignedFlowCount--;
|
||||
it = _flows.erase(it);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
} else if (oldest) { // Remove single oldest by natural expiration
|
||||
}
|
||||
else if (oldest) { // Remove single oldest by natural expiration
|
||||
uint64_t maxAge = 0;
|
||||
while (it != _flows.end()) {
|
||||
if (it->second->age(now) > maxAge) {
|
||||
|
@ -766,7 +780,8 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>&
|
|||
if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
|
||||
debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
|
||||
_negotiatedPathIdx = pathIdx;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
|
||||
}
|
||||
}
|
||||
|
@ -881,7 +896,8 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
|
|||
if (atAddress) {
|
||||
outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
|
||||
RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
RR->sw->send(tPtr, outp, false);
|
||||
}
|
||||
Metrics::pkt_qos_out++;
|
||||
|
@ -1222,7 +1238,8 @@ void Bond::estimatePathQuality(int64_t now)
|
|||
if ((now - it->second) >= qosRecordTimeout) {
|
||||
it = _paths[i].qosStatsOut.erase(it);
|
||||
++numDroppedQosOutRecords;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
@ -1250,7 +1267,8 @@ void Bond::estimatePathQuality(int64_t now)
|
|||
if ((now - it->second) >= qosRecordTimeout) {
|
||||
it = _paths[i].qosStatsIn.erase(it);
|
||||
++numDroppedQosInRecords;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
@ -1327,10 +1345,10 @@ void Bond::estimatePathQuality(int64_t now)
|
|||
continue;
|
||||
}
|
||||
// Compute/Smooth average of real-world observations
|
||||
if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) {
|
||||
if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
|
||||
_paths[i].latency = _paths[i].latencySamples.mean();
|
||||
}
|
||||
if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) {
|
||||
if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
|
||||
_paths[i].latencyVariance = _paths[i].latencySamples.stddev();
|
||||
}
|
||||
|
||||
|
@ -1344,6 +1362,7 @@ void Bond::estimatePathQuality(int64_t now)
|
|||
//_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0);
|
||||
// _valid is written elsewhere
|
||||
_paths[i].p->_relativeQuality = _paths[i].relativeQuality;
|
||||
_paths[i].p->_localPort = _paths[i].localPort;
|
||||
}
|
||||
|
||||
// Flag links for avoidance
|
||||
|
@ -1370,7 +1389,8 @@ void Bond::estimatePathQuality(int64_t now)
|
|||
shouldAvoid = true;
|
||||
}
|
||||
_paths[i].shouldAvoid = shouldAvoid;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
if (! shouldAvoid) {
|
||||
log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str());
|
||||
_paths[i].shouldAvoid = false;
|
||||
|
@ -1482,7 +1502,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|||
_lastBondStatusLog = now;
|
||||
if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
|
||||
log("no active link");
|
||||
} else if (_paths[_abPathIdx].p) {
|
||||
}
|
||||
else if (_paths[_abPathIdx].p) {
|
||||
log("active link is %s, failover queue size is %zu", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
|
||||
}
|
||||
if (_abFailoverQueue.empty()) {
|
||||
|
@ -1590,7 +1611,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|||
log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
@ -1739,7 +1761,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|||
if (! _abFailoverQueue.empty()) {
|
||||
dequeueNextActiveBackupPath(now);
|
||||
log("active link switched to %s", pathToStr(_paths[_abPathIdx].p).c_str());
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
log("failover queue is empty, no links to choose from");
|
||||
}
|
||||
}
|
||||
|
@ -1785,7 +1808,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|||
dequeueNextActiveBackupPath(now);
|
||||
_lastPathNegotiationCheck = now;
|
||||
log("switch negotiated link %s (select mode: optimize)", pathToStr(_paths[_abPathIdx].p).c_str());
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
// Try to find a better path and automatically switch to it -- not too often, though.
|
||||
if ((now - _lastActiveBackupPathChange) > ZT_BOND_OPTIMIZE_INTERVAL) {
|
||||
if (! _abFailoverQueue.empty()) {
|
||||
|
@ -1901,7 +1925,7 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
|
|||
}
|
||||
|
||||
if (! _isLeaf) {
|
||||
_policy = ZT_BOND_POLICY_ACTIVE_BACKUP;
|
||||
_policy = ZT_BOND_POLICY_NONE;
|
||||
}
|
||||
|
||||
// Timer geometry
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue