RingBuffer<> is now templated with size, buffer is now static.

This commit is contained in:
Adam Ierymenko 2019-03-22 14:39:52 -07:00
parent af3ec000a0
commit d530356055
6 changed files with 95 additions and 247 deletions

View file

@ -123,7 +123,8 @@ public:
_lastComputedThroughputDistCoeff(0.0),
_lastAllocation(0)
{
prepareBuffers();
memset(_ifname, 0, 16);
memset(_addrString, 0, sizeof(_addrString));
}
Path(const int64_t localSocket,const InetAddress &addr) :
@ -155,22 +156,11 @@ public:
_lastComputedThroughputDistCoeff(0.0),
_lastAllocation(0)
{
prepareBuffers();
memset(_ifname, 0, 16);
memset(_addrString, 0, sizeof(_addrString));
_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
}
~Path()
{
delete _throughputSamples;
delete _latencySamples;
delete _packetValiditySamples;
delete _throughputDisturbanceSamples;
_throughputSamples = NULL;
_latencySamples = NULL;
_packetValiditySamples = NULL;
_throughputDisturbanceSamples = NULL;
}
/**
* Called when a packet is received from this remote path, regardless of content
*
@ -216,7 +206,7 @@ public:
else {
_latency = l;
}
_latencySamples->push(l);
_latencySamples.push(l);
}
/**
@ -345,7 +335,7 @@ public:
_inQoSRecords[packetId] = now;
_packetsReceivedSinceLastQoS++;
}
_packetValiditySamples->push(true);
_packetValiditySamples.push(true);
}
}
@ -362,7 +352,7 @@ public:
int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) {
uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation * 8) / ((float)timeSinceThroughputEstimate / (float)1000);
_throughputSamples->push(throughput);
_throughputSamples.push(throughput);
_maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput;
_lastThroughputEstimation = now;
_bytesAckedSinceLastThroughputEstimation = 0;
@ -564,7 +554,7 @@ public:
* Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
* contribute to a Packet Error Ratio (PER).
*/
inline void recordInvalidPacket() { _packetValiditySamples->push(false); }
inline void recordInvalidPacket() { _packetValiditySamples.push(false); }
/**
* @return A pointer to a cached copy of the address string for this Path (For debugging only)
@ -582,35 +572,43 @@ public:
*
* @param now Current time
*/
inline void processBackgroundPathMeasurements(int64_t now) {
inline void processBackgroundPathMeasurements(const int64_t now)
{
if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
Mutex::Lock _l(_statistics_m);
_lastPathQualityComputeTime = now;
address().toString(_addrString);
_lastComputedMeanLatency = _latencySamples->mean();
_lastComputedPacketDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
_lastComputedMeanThroughput = (uint64_t)_throughputSamples->mean();
_lastComputedMeanLatency = _latencySamples.mean();
_lastComputedPacketDelayVariance = _latencySamples.stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
_lastComputedMeanThroughput = (uint64_t)_throughputSamples.mean();
// If no packet validity samples, assume PER==0
_lastComputedPacketErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
_lastComputedPacketErrorRatio = 1 - (_packetValiditySamples.count() ? _packetValiditySamples.mean() : 1);
// Compute path stability
// Normalize measurements with wildly different ranges into a reasonable range
float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1;
float throughput_cv = _throughputSamples.mean() > 0 ? _throughputSamples.stddev() / _throughputSamples.mean() : 1;
// Form an exponential cutoff and apply contribution weights
float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV;
float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY;
// Throughput Disturbance Coefficient
float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
_throughputDisturbanceSamples->push(throughput_cv);
_lastComputedThroughputDistCoeff = _throughputDisturbanceSamples->mean();
_throughputDisturbanceSamples.push(throughput_cv);
_lastComputedThroughputDistCoeff = _throughputDisturbanceSamples.mean();
// Obey user-defined ignored contributions
pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
// Stability
_lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
_lastComputedStability *= 1 - _lastComputedPacketErrorRatio;
// Prevent QoS records from sticking around for too long
std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin();
while (it != _outQoSRecords.end()) {
@ -647,18 +645,6 @@ public:
*/
inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; }
/**
* Initialize statistical buffers
*/
inline void prepareBuffers() {
_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_throughputDisturbanceSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
memset(_ifname, 0, 16);
memset(_addrString, 0, sizeof(_addrString));
}
private:
Mutex _statistics_m;
@ -672,9 +658,9 @@ private:
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
std::map<uint64_t, uint64_t> _outQoSRecords; // id:egress_time
std::map<uint64_t, uint64_t> _inQoSRecords; // id:now
std::map<uint64_t, uint16_t> _inACKRecords; // id:len
std::map<uint64_t,uint64_t> _outQoSRecords; // id:egress_time
std::map<uint64_t,uint64_t> _inQoSRecords; // id:now
std::map<uint64_t,uint16_t> _inACKRecords; // id:len
int64_t _lastAck;
int64_t _lastThroughputEstimation;
@ -702,16 +688,14 @@ private:
float _lastComputedThroughputDistCoeff;
unsigned char _lastAllocation;
// cached human-readable strings for tracing purposes
char _ifname[16];
char _addrString[256];
RingBuffer<uint64_t> *_throughputSamples;
RingBuffer<uint32_t> *_latencySamples;
RingBuffer<bool> *_packetValiditySamples;
RingBuffer<float> *_throughputDisturbanceSamples;
RingBuffer<uint64_t,ZT_PATH_QUALITY_METRIC_WIN_SZ> _throughputSamples;
RingBuffer<uint32_t,ZT_PATH_QUALITY_METRIC_WIN_SZ> _latencySamples;
RingBuffer<bool,ZT_PATH_QUALITY_METRIC_WIN_SZ> _packetValiditySamples;
RingBuffer<float,ZT_PATH_QUALITY_METRIC_WIN_SZ> _throughputDisturbanceSamples;
};
} // namespace ZeroTier