Refactor incoming packet (rxQueue/fragmentQueue) to eliminate variable length queues and merge queues. This is both faster and saves memory.

This commit is contained in:
Adam Ierymenko 2016-03-18 14:16:07 -07:00
parent 9f31cbd8b8
commit d6a1868d0a
6 changed files with 317 additions and 258 deletions

View file

@ -26,8 +26,6 @@ namespace ZeroTier {
DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) :
RR(renv),
_readPtr(0),
_writePtr(0),
_waiting(0),
_die(false)
{
@ -37,39 +35,45 @@ DeferredPackets::~DeferredPackets()
{
_q_m.lock();
_die = true;
while (_waiting > 0) {
_q_m.unlock();
_q_m.unlock();
for(;;) {
_q_s.post();
_q_m.lock();
if (_waiting <= 0) {
_q_m.unlock();
break;
} else {
_q_m.unlock();
}
}
}
bool DeferredPackets::enqueue(IncomingPacket *pkt)
{
_q_m.lock();
const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX;
if (_q[p]) {
_q_m.unlock();
return false;
} else {
_q[p].setToUnsafe(pkt);
++_writePtr;
_q_m.unlock();
_q_s.post();
return true;
{
Mutex::Lock _l(_q_m);
if (_q.size() >= ZT_DEFFEREDPACKETS_MAX)
return false;
_q.push_back(*pkt);
}
_q_s.post();
return true;
}
int DeferredPackets::process()
{
SharedPtr<IncomingPacket> pkt;
std::list<IncomingPacket> pkt;
_q_m.lock();
if (_die) {
_q_m.unlock();
return -1;
}
while (_readPtr == _writePtr) {
while (_q.empty()) {
++_waiting;
_q_m.unlock();
_q_s.wait();
@ -80,10 +84,16 @@ int DeferredPackets::process()
return -1;
}
}
pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]);
// Move item from _q list to a dummy list here to avoid copying packet
pkt.splice(pkt.end(),_q,_q.begin());
_q_m.unlock();
pkt->tryDecode(RR,true);
try {
pkt.front().tryDecode(RR,true);
} catch ( ... ) {} // drop invalids
return 1;
}