Formatting

This commit is contained in:
Joseph Henry 2020-05-14 20:09:25 -07:00
parent 4465952d11
commit 58d567c331
6 changed files with 271 additions and 137 deletions

View file

@ -140,7 +140,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
}
}
}
//fprintf(stderr, "resultant _rrIdx=%d\n", _rrIdx);
fprintf(stderr, "_rrIdx=%d\n", _rrIdx);
if (_paths[_bondedIdx[_rrIdx]]) {
return _paths[_bondedIdx[_rrIdx]];
}
@ -246,7 +246,7 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
}
/**
* Learn new flows and pro-actively create entries for them in the bond so
* that the next time we send a packet out that is part of a flow we know
* that the next time we send a packet out that is part of a flow we know
* which path to use.
*/
if ((flowId != ZT_QOS_NO_FLOW)
@ -385,7 +385,7 @@ SharedPtr<Flow> Bond::createFlow(const SharedPtr<Path> &path, int32_t flowId, un
}
if (_flows.size() >= ZT_FLOW_MAX_COUNT) {
fprintf(stderr, "max number of flows reached (%d), forcibly forgetting oldest flow\n", ZT_FLOW_MAX_COUNT);
forgetFlowsWhenNecessary(0,true,now);
forgetFlowsWhenNecessary(0,true,now);
}
SharedPtr<Flow> flow = new Flow(flowId, now);
_flows[flowId] = flow;
@ -588,7 +588,7 @@ void Bond::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int6
} else {
RR->sw->send(tPtr,outp,false);
}
// Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
// Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
path->_packetsReceivedSinceLastQoS = 0;
path->_lastQoSMeasurement = now;
}
@ -608,7 +608,7 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now)
//fprintf(stderr, "_lastFrame=%llu, suggestedMonitorInterval=%d, _dynamicPathMonitorInterval=%d\n",
// (now-_lastFrame), suggestedMonitorInterval, _dynamicPathMonitorInterval);
}
if (_slaveMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) {
_shouldCollectPathStatistics = true;
}
@ -673,7 +673,7 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now)
if (((now - _lastPathNegotiationCheck) > ZT_PATH_NEGOTIATION_CHECK_INTERVAL) && _allowPathNegotiation) {
_lastPathNegotiationCheck = now;
pathNegotiationCheck(tPtr, now);
}
}
}
void Bond::applyUserPrefs()
@ -854,8 +854,8 @@ void Bond::estimatePathQuality(const int64_t now)
float plr[ZT_MAX_PEER_NETWORK_PATHS];
float per[ZT_MAX_PEER_NETWORK_PATHS];
float thr[ZT_MAX_PEER_NETWORK_PATHS];
float thm[ZT_MAX_PEER_NETWORK_PATHS];
float thv[ZT_MAX_PEER_NETWORK_PATHS];
float thm[ZT_MAX_PEER_NETWORK_PATHS];
float thv[ZT_MAX_PEER_NETWORK_PATHS];
float maxLAT = 0;
float maxPDV = 0;
@ -867,7 +867,7 @@ void Bond::estimatePathQuality(const int64_t now)
float quality[ZT_MAX_PEER_NETWORK_PATHS];
uint8_t alloc[ZT_MAX_PEER_NETWORK_PATHS];
float totQuality = 0.0f;
memset(&lat, 0, sizeof(lat));
@ -950,7 +950,7 @@ void Bond::estimatePathQuality(const int64_t now)
//fprintf(stdout, "EH %d: lat=%8.3f, ltm=%8.3f, pdv=%8.3f, plr=%5.3f, per=%5.3f, thr=%8f, thm=%5.3f, thv=%5.3f, avl=%5.3f, age=%8.2f, scp=%4d, q=%5.3f, qtot=%5.3f, ac=%d if=%s, path=%s\n",
// i, lat[i], ltm[i], pdv[i], plr[i], per[i], thr[i], thm[i], thv[i], avl[i], age[i], scp[i], quality[i], totQuality, alloc[i], getSlave(_paths[i])->ifname().c_str(), pathStr);
}
// Convert metrics to relative quantities and apply contribution weights
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
@ -966,7 +966,7 @@ void Bond::estimatePathQuality(const int64_t now)
totQuality += quality[i];
}
}
//
//
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i] && _paths[i]->bonded()) {
alloc[i] = std::ceil((quality[i] / totQuality) * (float)255);
@ -1011,8 +1011,8 @@ void Bond::estimatePathQuality(const int64_t now)
if (_paths[i]) {
_paths[i]->address().toString(pathStr);
fprintf(stdout, "%s, %s, %8.3f, %8.3f, %8.3f, %5.3f, %5.3f, %5.3f, %8f, %5.3f, %5.3f, %d, %5.3f, %d, %d, %d, %d, %d, %d, ",
getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->latencyMean, lat[i],pdv[i], _paths[i]->packetLossRatio, plr[i],per[i],thr[i],thm[i],thv[i],(now - _paths[i]->lastIn()),quality[i],alloc[i],
_paths[i]->relativeByteLoad, _paths[i]->assignedFlowCount, _paths[i]->alive(now, true), _paths[i]->eligible(now,_ackSendInterval), _paths[i]->qosStatsOut.size());
getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->latencyMean, lat[i],pdv[i], _paths[i]->packetLossRatio, plr[i],per[i],thr[i],thm[i],thv[i],(now - _paths[i]->lastIn()),quality[i],alloc[i],
_paths[i]->relativeByteLoad, _paths[i]->assignedFlowCount, _paths[i]->alive(now, true), _paths[i]->eligible(now,_ackSendInterval), _paths[i]->qosStatsOut.size());
}
}
fprintf(stdout, "\n");
@ -1022,7 +1022,144 @@ void Bond::estimatePathQuality(const int64_t now)
void Bond::processBalanceTasks(const int64_t now)
{
// Omitted
//fprintf(stderr, "processBalanceTasks\n");
char curPathStr[128];
if (_allowFlowHashing) {
/**
* Clean up and reset flows if necessary
*/
if ((now - _lastFlowExpirationCheck) > ZT_MULTIPATH_FLOW_CHECK_INTERVAL) {
Mutex::Lock _l(_flows_m);
forgetFlowsWhenNecessary(ZT_MULTIPATH_FLOW_EXPIRATION_INTERVAL,false,now);
_lastFlowExpirationCheck = now;
}
if ((now - _lastFlowStatReset) > ZT_FLOW_STATS_RESET_INTERVAL) {
Mutex::Lock _l(_flows_m);
_lastFlowStatReset = now;
std::map<int32_t,SharedPtr<Flow> >::iterator it = _flows.begin();
while (it != _flows.end()) {
it->second->resetByteCounts();
++it;
}
}
/**
* Re-allocate flows from dead paths
*/
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE) {
Mutex::Lock _l(_flows_m);
for (int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (!_paths[i]) {
continue;
}
if (!_paths[i]->eligible(now,_ackSendInterval) && _paths[i]->_shouldReallocateFlows) {
_paths[i]->address().toString(curPathStr);
fprintf(stderr, "%d reallocating flows from dead path %s on %s\n", (RR->node->now() - RR->bc->getBondStartTime()), curPathStr, getSlave(_paths[i])->ifname().c_str());
std::map<int32_t,SharedPtr<Flow> >::iterator flow_it = _flows.begin();
while (flow_it != _flows.end()) {
if (flow_it->second->assignedPath() == _paths[i]) {
if(assignFlowToBondedPath(flow_it->second, now)) {
_paths[i]->_assignedFlowCount--;
}
}
++flow_it;
}
_paths[i]->_shouldReallocateFlows = false;
}
}
}
}
/**
* Tasks specific to (Balance Round Robin)
*/
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR) {
if (_allowFlowHashing) {
// TODO: Should ideally failover from (idx) to a random slave, this is so that (idx+1) isn't overloaded
}
else if (!_allowFlowHashing) {
// Nothing
}
}
/**
* Tasks specific to (Balance XOR)
*/
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR) {
// Nothing specific for XOR
}
/**
* Tasks specific to (Balance Aware)
*/
if ((_bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE)) {
if (_allowFlowHashing) {
Mutex::Lock _l(_flows_m);
/**
* Re-balance flows in proportion to slave capacity (or when eligibility changes)
*/
if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
/**
* Determine "load" for bonded paths
*/
uint64_t totalBytes = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { // first pass: compute absolute byte load and total
if (_paths[i] && _paths[i]->bonded()) {
_paths[i]->_byteLoad = 0;
std::map<int32_t,SharedPtr<Flow> >::iterator flow_it = _flows.begin();
while (flow_it != _flows.end()) {
if (flow_it->second->assignedPath() == _paths[i]) {
_paths[i]->_byteLoad += flow_it->second->totalBytes();
}
++flow_it;
}
totalBytes += _paths[i]->_byteLoad;
}
}
/**
* Determine "affinity" for bonded path
*/
//fprintf(stderr, "\n\n");
_totalBondUnderload = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { // second pass: compute relative byte loads and total imbalance
if (_paths[i] && _paths[i]->bonded()) {
if (totalBytes) {
uint8_t relativeByteLoad = std::ceil(((float)_paths[i]->_byteLoad / (float)totalBytes) * (float)255);
//fprintf(stderr, "lastComputedAllocation = %d\n", _paths[i]->allocation);
//fprintf(stderr, " relativeByteLoad = %d\n", relativeByteLoad);
_paths[i]->_relativeByteLoad = relativeByteLoad;
uint8_t relativeUnderload = std::max(0, (int)_paths[i]->_allocation - (int)relativeByteLoad);
//fprintf(stderr, " relativeUnderload = %d\n", relativeUnderload);
_totalBondUnderload += relativeUnderload;
//fprintf(stderr, " _totalBondUnderload = %d\n\n", _totalBondUnderload);
//_paths[i]->affinity = (relativeUnderload > 0 ? relativeUnderload : _paths[i]->_allocation);
}
else { // set everything to base values
_totalBondUnderload = 0;
//_paths[i]->affinity = 0;
}
}
}
//fprintf(stderr, "_totalBondUnderload=%d (end)\n\n", _totalBondUnderload);
/**
*
*/
//fprintf(stderr, "_lastFlowRebalance\n");
std::map<int32_t, SharedPtr<Flow> >::iterator it = _flows.begin();
while (it != _flows.end()) {
int32_t flowId = it->first;
SharedPtr<Flow> flow = it->second;
if ((now - flow->_lastPathReassignment) > ZT_FLOW_MIN_REBALANCE_INTERVAL) {
//fprintf(stdout, " could move : %x\n", flowId);
}
++it;
}
_lastFlowRebalance = now;
}
}
else if (!_allowFlowHashing) {
// Nothing
}
}
}
void Bond::dequeueNextActiveBackupPath(const uint64_t now)
@ -1042,7 +1179,7 @@ void Bond::dequeueNextActiveBackupPath(const uint64_t now)
}
void Bond::processActiveBackupTasks(const int64_t now)
{
{
//fprintf(stderr, "%llu processActiveBackupTasks\n", (now - RR->bc->getBondStartTime()));
char pathStr[128]; char prevPathStr[128]; char curPathStr[128];
@ -1058,7 +1195,7 @@ void Bond::processActiveBackupTasks(const int64_t now)
/**
* [Automatic mode]
* The user has not explicitly specified slaves or their failover schedule,
* the bonding policy will now select the first eligible path and set it as
* the bonding policy will now select the first eligible path and set it as
* its active backup path, if a substantially better path is detected the bonding
* policy will assign it as the new active backup path. If the path fails it will
* simply find the next eligible path.
@ -1187,9 +1324,9 @@ void Bond::processActiveBackupTasks(const int64_t now)
}
SharedPtr<Slave> slave =RR->bc->getSlaveBySocket(_policyAlias, _paths[i]->localSocket());
_paths[i]->address().toString(pathStr);
int failoverScoreHandicap = _paths[i]->_failoverScore;
if (_paths[i]->preferred())
if (_paths[i]->preferred())
{
failoverScoreHandicap += ZT_MULTIPATH_FAILOVER_HANDICAP_PREFERRED;
//fprintf(stderr, "%s on %s ----> %d for preferred\n", pathStr, _paths[i]->ifname().c_str(), failoverScoreHandicap);
@ -1264,7 +1401,7 @@ void Bond::processActiveBackupTasks(const int64_t now)
if (_paths[i].ptr() == negotiatedPath.ptr()) {
_paths[i]->_negotiated = true;
failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED;
} else {
} else {
_paths[i]->_negotiated = false;
}
_paths[i]->_failoverScore = _paths[i]->_allocation + failoverScoreHandicap;
@ -1386,7 +1523,7 @@ void Bond::setReasonableDefaults(int policy)
_lastPathNegotiationReceived=0;
_lastBackgroundTaskCheck=0;
_lastPathNegotiationCheck=0;
_lastFlowStatReset=0;
_lastFlowExpirationCheck=0;
_localUtility=0;
@ -1397,7 +1534,7 @@ void Bond::setReasonableDefaults(int policy)
_pathNegotiationCutoffCount=0;
_lastFlowRebalance=0;
_totalBondUnderload = 0;
//_maxAcceptableLatency
_maxAcceptablePacketDelayVariance = 50;
_maxAcceptablePacketLossRatio = 0.10;
@ -1445,7 +1582,7 @@ void Bond::setReasonableDefaults(int policy)
case ZT_BONDING_POLICY_BALANCE_RR:
_failoverInterval = 5000;
_allowFlowHashing = false;
_packetsPerSlave = 8;
_packetsPerSlave = 512;
_slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
_qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
_qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
@ -1550,8 +1687,8 @@ void Bond::setUserQualityWeights(float weights[], int len)
bool Bond::relevant() {
return _peer->identity().address().toInt() == 0x16a03a3d03
|| _peer->identity().address().toInt() == 0x4410300d03
return _peer->identity().address().toInt() == 0x16a03a3d03
|| _peer->identity().address().toInt() == 0x4410300d03
|| _peer->identity().address().toInt() == 0x795cbf86fa;
}
@ -1566,7 +1703,7 @@ void Bond::dumpInfo(const int64_t now)
//char oldPathStr[128];
char currPathStr[128];
if (!relevant()) {
if (!relevant()) {
return;
}
/*
@ -1589,7 +1726,7 @@ void Bond::dumpInfo(const int64_t now)
}
_lastPrintTS = now;
_lastLogTS = now;
fprintf(stderr, "\n\n");
for(int i=0; i<ZT_MAX_PEER_NETWORK_PATHS; ++i) {