Introduced basic multipath support

This commit is contained in:
Joseph Henry 2018-05-01 16:32:15 -07:00
parent 836d897aec
commit 6a2ba4baca
17 changed files with 1314 additions and 45 deletions

View file

@ -39,6 +39,9 @@
#include "SharedPtr.hpp"
#include "AtomicCounter.hpp"
#include "Utils.hpp"
#include "RingBuffer.hpp"
#include "../osdep/Phy.hpp"
/**
* Maximum return value of preferenceRank()
@ -55,6 +58,7 @@ class RuntimeEnvironment;
class Path
{
friend class SharedPtr<Path>;
Phy<Path *> *_phy;
public:
/**
@ -93,22 +97,71 @@ public:
_lastOut(0),
_lastIn(0),
_lastTrustEstablishedPacketReceived(0),
_lastPathQualityComputeTime(0),
_localSocket(-1),
_latency(0xffff),
_addr(),
_ipScope(InetAddress::IP_SCOPE_NONE)
_ipScope(InetAddress::IP_SCOPE_NONE),
_currentPacketSampleCounter(0),
_meanPacketErrorRatio(0.0),
_meanLatency(0.0),
_lastLatencyUpdate(0),
_jitter(0.0),
_lastPathQualitySampleTime(0),
_lastComputedQuality(0.0),
_lastPathQualityEstimate(0),
_meanAge(0.0),
_meanThroughput(0.0),
_packetLossRatio(0)
{
memset(_ifname, 0, sizeof(_ifname));
memset(_addrString, 0, sizeof(_addrString));
_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
}
Path(const int64_t localSocket,const InetAddress &addr) :
_lastOut(0),
_lastIn(0),
_lastTrustEstablishedPacketReceived(0),
_lastPathQualityComputeTime(0),
_localSocket(localSocket),
_latency(0xffff),
_addr(addr),
_ipScope(addr.ipScope())
_ipScope(addr.ipScope()),
_currentPacketSampleCounter(0),
_meanPacketErrorRatio(0.0),
_meanLatency(0.0),
_lastLatencyUpdate(0),
_jitter(0.0),
_lastPathQualitySampleTime(0),
_lastComputedQuality(0.0),
_lastPathQualityEstimate(0),
_meanAge(0.0),
_meanThroughput(0.0),
_packetLossRatio(0)
{
memset(_ifname, 0, sizeof(_ifname));
memset(_addrString, 0, sizeof(_addrString));
_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
}
~Path()
{
delete _throughputSamples;
delete _ageSamples;
delete _latencySamples;
delete _errSamples;
_throughputSamples = NULL;
_ageSamples = NULL;
_latencySamples = NULL;
_errSamples = NULL;
}
/**
@ -147,12 +200,17 @@ public:
*
* @param l Measured latency
*/
inline void updateLatency(const unsigned int l)
inline void updateLatency(const unsigned int l, int64_t now)
{
unsigned int pl = _latency;
if (pl < 0xffff)
if (pl < 0xffff) {
_latency = (pl + l) / 2;
else _latency = l;
}
else {
_latency = l;
}
_lastLatencyUpdate = now;
_latencySamples->push(l);
}
/**
@ -240,11 +298,180 @@ public:
return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1));
}
/**
* @return An estimate of path quality -- higher is better.
*/
inline float computeQuality(const int64_t now)
{
float latency_contrib = _meanLatency ? 1.0 / _meanLatency : 0;
float jitter_contrib = _jitter ? 1.0 / _jitter : 0;
float throughput_contrib = _meanThroughput ? _meanThroughput / 1000000 : 0; // in Mbps
float age_contrib = _meanAge > 0 ? (float)sqrt(_meanAge) : 1;
float error_contrib = 1.0 - _meanPacketErrorRatio;
float sum = (latency_contrib + jitter_contrib + throughput_contrib + error_contrib) / age_contrib;
_lastComputedQuality = sum * (long)((_ipScope) + 1);
return _lastComputedQuality;
}
/**
* Since quality estimates can become expensive we should cache the most recent result for traffic allocation
* algorithms which may need to reference this value multiple times through the course of their execution.
*/
inline float lastComputedQuality() {
return _lastComputedQuality;
}
/**
* @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
*/
inline char *getName() { return _ifname; }
/**
* @return Estimated throughput in bps of this link
*/
inline uint64_t getThroughput() { return _phy->getThroughput((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return Packet delay varience
*/
inline float jitter() { return _jitter; }
/**
* @return Previously-computed mean latency
*/
inline float meanLatency() { return _meanLatency; }
/**
* @return Packet loss rate
*/
inline float packetLossRatio() { return _packetLossRatio; }
/**
* @return Mean packet error ratio
*/
inline float meanPacketErrorRatio() { return _meanPacketErrorRatio; }
/**
* @return Current packet error ratio (possibly incomplete sample set)
*/
inline float currentPacketErrorRatio() {
int errorsPerSample = 0;
for (int i=0; i<_currentPacketSampleCounter; i++) {
if (_packetValidity[i] == false) {
errorsPerSample++;
}
}
return (float)errorsPerSample / (float)ZT_PATH_ERROR_SAMPLE_WIN_SZ;
}
/**
* @return Whether the Path's local socket is in a CLOSED state
*/
inline bool isClosed() { return _phy->isClosed((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return The state of a Path's local socket
*/
inline int getState() { return _phy->getState((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return Whether this socket may have been erased by the virtual physical link layer
*/
inline bool isValidState() { return _phy->isValidState((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return Whether the path quality monitors have collected enough data to provide a quality value
* TODO: expand this
*/
inline bool monitorsReady() {
return _latencySamples->count() && _ageSamples->count() && _throughputSamples->count();
}
/**
* @return A pointer to a cached copy of the address string for this Path (For debugging only)
*/
inline char *getAddressString() { return _addrString; }
/**
* Handle path sampling, computation of quality estimates, and other periodic tasks
* @param now Current time
*/
inline void measureLink(int64_t now) {
// Sample path properties and store them in a continuously-revolving buffer
if (now - _lastPathQualitySampleTime > ZT_PATH_QUALITY_SAMPLE_INTERVAL) {
_lastPathQualitySampleTime = now;
_throughputSamples->push(getThroughput()); // Thoughtput in bits/s
_ageSamples->push(now - _lastIn); // Age (time since last received packet)
if (now - _lastLatencyUpdate > ZT_PATH_LATENCY_SAMPLE_INTERVAL) {
_lastLatencyUpdate = now;
// Record 0 bp/s. Since we're using this to detect possible packet loss
updateLatency(0, now);
}
}
// Compute statistical values for use in link quality estimates
if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
_lastPathQualityComputeTime = now;
// Cache Path address string
address().toString(_addrString);
_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, ZT_PATH_INTERFACE_NAME_SZ); // Cache Interface name
// Derived values
if (_throughputSamples->count()) {
_packetLossRatio = (float)_throughputSamples->zeroCount() / (float)_throughputSamples->count();
}
_meanThroughput = _throughputSamples->mean();
_meanAge = _ageSamples->mean();
_meanLatency = _latencySamples->mean();
// Jitter
// SEE: RFC 3393, RFC 4689
_jitter = _latencySamples->stddev();
_meanPacketErrorRatio = _errSamples->mean(); // Packet Error Ratio (PER)
}
// Periodically compute a path quality estimate
if (now - _lastPathQualityEstimate > ZT_PATH_QUALITY_ESTIMATE_INTERVAL) {
computeQuality(now);
}
}
/**
* Record whether a packet is considered invalid by MAC/compression/cipher checks. This
* could be an indication of a bit error. This function will keep a running counter of
* up to a given window size and with each counter overflow it will compute a mean error rate
* and store that in a continuously shifting sample window.
*
* @param isValid Whether the packet in question is considered invalid
*/
inline void recordPacket(bool isValid) {
if (_currentPacketSampleCounter < ZT_PATH_ERROR_SAMPLE_WIN_SZ) {
_packetValidity[_currentPacketSampleCounter] = isValid;
_currentPacketSampleCounter++;
}
else {
// Sample array is full, compute an mean and stick it in the ring buffer for trend analysis
_errSamples->push(currentPacketErrorRatio());
_currentPacketSampleCounter=0;
}
}
/**
* @return The mean age (in ms) of this link
*/
inline float meanAge() { return _meanAge; }
/**
* @return The mean throughput (in bits/s) of this link
*/
inline float meanThroughput() { return _meanThroughput; }
/**
* @return True if this path is alive (receiving heartbeats)
*/
inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
/**
* @return True if this path hasn't received a packet in a "significant" amount of time
*/
inline bool stale(const int64_t now) const { return ((now - _lastIn) > ZT_LINK_SPEED_TEST_INTERVAL * 10); }
/**
* @return True if this path needs a heartbeat
*/
@ -269,11 +496,39 @@ private:
volatile int64_t _lastOut;
volatile int64_t _lastIn;
volatile int64_t _lastTrustEstablishedPacketReceived;
volatile int64_t _lastPathQualityComputeTime;
int64_t _localSocket;
volatile unsigned int _latency;
InetAddress _addr;
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
// Packet Error Ratio (PER)
int _packetValidity[ZT_PATH_ERROR_SAMPLE_WIN_SZ];
int _currentPacketSampleCounter;
volatile float _meanPacketErrorRatio;
// Latency and Jitter
volatile float _meanLatency;
int64_t _lastLatencyUpdate;
volatile float _jitter;
int64_t _lastPathQualitySampleTime;
float _lastComputedQuality;
int64_t _lastPathQualityEstimate;
float _meanAge;
float _meanThroughput;
// Circular buffers used to efficiently store large time series
RingBuffer<uint64_t> *_throughputSamples;
RingBuffer<uint32_t> *_latencySamples;
RingBuffer<uint64_t> *_ageSamples;
RingBuffer<float> *_errSamples;
float _packetLossRatio;
char _ifname[ZT_PATH_INTERFACE_NAME_SZ];
char _addrString[256];
};
} // namespace ZeroTier