Partial implementation of ZT_MULTIPATH_ACTIVE_BACKUP

This commit is contained in:
Joseph Henry 2019-08-20 16:19:20 -07:00
parent 963113b86d
commit b0a91c0187
3 changed files with 83 additions and 12 deletions

View file

@ -347,7 +347,7 @@ void Peer::computeAggregateAllocation(int64_t now)
+ (fmaxf(1.0f, relThroughput[i]) * (float)ZT_PATH_CONTRIB_THROUGHPUT)
+ relScope * (float)ZT_PATH_CONTRIB_SCOPE;
relQuality *= age_contrib;
// Arbitrary cutoffs
// Clamp values
relQuality = relQuality > (1.00f / 100.0f) ? relQuality : 0.0f;
relQuality = relQuality < (99.0f / 100.0f) ? relQuality : 1.0f;
totalRelativeQuality += relQuality;
@ -357,7 +357,6 @@ void Peer::computeAggregateAllocation(int64_t now)
// Convert set of relative performances into an allocation set
for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM) {
_paths[i].p->updateComponentAllocationOfAggregateLink(((float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count()) * 255);
}
@ -420,10 +419,10 @@ int Peer::aggregateLinkLogicalPathCount()
return pathCount;
}
std::vector<SharedPtr<Path>> Peer::getAllPaths(int64_t now)
std::vector<SharedPtr<Path> > Peer::getAllPaths(int64_t now)
{
Mutex::Lock _l(_virtual_paths_m); // FIXME: TX can now lock RX
std::vector<SharedPtr<Path>> paths;
std::vector<SharedPtr<Path> > paths;
for (int i=0; i<_virtualPaths.size(); i++) {
if (_virtualPaths[i]->p) {
paths.push_back(_virtualPaths[i]->p);
@ -436,6 +435,8 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired, int64
{
Mutex::Lock _l(_paths_m);
SharedPtr<Path> selectedPath;
char curPathStr[128];
char newPathStr[128];
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
/**
@ -511,14 +512,66 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired, int64
* All traffic is sent on all paths.
*/
if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
// Not handled here. Handled in Switch.cpp
// Not handled here. Handled in Switch::_trySend()
}
/**
* Only one link is active. Fail-over is immediate.
*/
if (RR->node->getMultipathMode() == ZT_MULTIPATH_ACTIVE_BACKUP) {
// fprintf(stderr, "ZT_MULTIPATH_ACTIVE_BACKUP\n");
bool bFoundHotPath = false;
if (!_activeBackupPath) {
/* Select the fist path that appears to still be active.
* This will eventually be user-configurable */
for (int i=0; i<ZT_MAX_PEER_NETWORK_PATHS; i++) {
if (_paths[i].p) {
if (_activeBackupPath.ptr() == _paths[i].p.ptr()) {
continue;
}
_activeBackupPath = _paths[i].p;
if ((now - _paths[i].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
bFoundHotPath = true;
_activeBackupPath = _paths[i].p;
_activeBackupPath->address().toString(curPathStr);
fprintf(stderr, "selected %s as the primary active-backup path to %llx\n",
curPathStr, this->_id.address().toInt());
}
}
}
if (!_activeBackupPath) {
return SharedPtr<Path>();
}
if (!bFoundHotPath) {
_activeBackupPath->address().toString(curPathStr);
fprintf(stderr, "no hot paths available to to use as active-backup primary to %llx, selected %s anyway\n",
this->_id.address().toInt(), curPathStr);
}
}
else {
if ((now - _activeBackupPath->lastIn()) > ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
_activeBackupPath->address().toString(curPathStr);
/* Fail-over to the fist path that appears to still be active.
* This will eventually be user-configurable */
for (int i=0; i<ZT_MAX_PEER_NETWORK_PATHS; i++) {
if (_paths[i].p) {
if (_activeBackupPath.ptr() == _paths[i].p.ptr()) {
continue;
}
if ((now - _paths[i].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
bFoundHotPath = true;
_activeBackupPath->address().toString(curPathStr); // Record path string for later debug trace
_activeBackupPath = _paths[i].p;
_activeBackupPath->address().toString(newPathStr);
}
}
}
if (bFoundHotPath) {
fprintf(stderr, "primary active-backup path %s to %llx appears to be dead, switched to path %s\n",
curPathStr, this->_id.address().toInt(), newPathStr);
}
}
}
return _activeBackupPath;
}
/**
@ -553,27 +606,25 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired, int64
*/
if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_XOR_FLOW) {
// fprintf(stderr, "ZT_MULTIPATH_BALANCE_XOR_FLOW (%llx) \n", flowId);
char pathStr[128];
struct Flow *currFlow = NULL;
if (_flows.count(flowId)) {
currFlow = _flows[flowId];
if (!currFlow->assignedPath) {
int idx = abs((int)(currFlow->flowId % (_virtualPaths.size()-1)));
currFlow->assignedPath = _virtualPaths[idx];
_virtualPaths[idx]->p->address().toString(pathStr);
_virtualPaths[idx]->p->address().toString(curPathStr);
fprintf(stderr, "assigning flow %llx between this node and peer %llx to path %s at index %d\n",
currFlow->flowId, this->_id.address().toInt(), pathStr, idx);
currFlow->flowId, this->_id.address().toInt(), curPathStr, idx);
}
else {
if (!currFlow->assignedPath->p->alive(now)) {
char newPathStr[128];
currFlow->assignedPath->p->address().toString(pathStr);
currFlow->assignedPath->p->address().toString(curPathStr);
// Re-assign
int idx = abs((int)(currFlow->flowId % (_virtualPaths.size()-1)));
currFlow->assignedPath = _virtualPaths[idx];
_virtualPaths[idx]->p->address().toString(newPathStr);
fprintf(stderr, "path %s assigned to flow %llx between this node and %llx appears to be dead, reassigning to path %s\n",
pathStr, currFlow->flowId, this->_id.address().toInt(), newPathStr);
curPathStr, currFlow->flowId, this->_id.address().toInt(), newPathStr);
}
}
return currFlow->assignedPath->p;