1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00
srs/trunk/3rdparty/srt-1-fit/srtcore/buffer.cpp
2021-05-16 08:23:26 +08:00

1955 lines
65 KiB
C++

/*
* SRT - Secure, Reliable, Transport
* Copyright (c) 2018 Haivision Systems Inc.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
*/
/*****************************************************************************
Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above
copyright notice, this list of conditions and the
following disclaimer.
* Redistributions in binary form must reproduce the
above copyright notice, this list of conditions
and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the University of Illinois
nor the names of its contributors may be used to
endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*****************************************************************************/
/*****************************************************************************
written by
Yunhong Gu, last updated 03/12/2011
modified by
Haivision Systems Inc.
*****************************************************************************/
#include <cstring>
#include <cmath>
#include "buffer.h"
#include "packet.h"
#include "core.h" // provides some constants
#include "logging.h"
using namespace std;
using namespace srt_logging;
CSndBuffer::CSndBuffer(int size, int mss)
: m_BufLock()
, m_pBlock(NULL)
, m_pFirstBlock(NULL)
, m_pCurrBlock(NULL)
, m_pLastBlock(NULL)
, m_pBuffer(NULL)
, m_iNextMsgNo(1)
, m_iSize(size)
, m_iMSS(mss)
, m_iCount(0)
, m_iBytesCount(0)
, m_ullLastOriginTime_us(0)
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
, m_LastSamplingTime(0)
, m_iCountMAvg(0)
, m_iBytesCountMAvg(0)
, m_TimespanMAvg(0)
#endif
, m_iInRatePktsCount(0)
, m_iInRateBytesCount(0)
, m_InRateStartTime(0)
, m_InRatePeriod(INPUTRATE_FAST_START_US) // 0.5 sec (fast start)
, m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
{
// initial physical buffer of "size"
m_pBuffer = new Buffer;
m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];
m_pBuffer->m_iSize = m_iSize;
m_pBuffer->m_pNext = NULL;
// circular linked list for out bound packets
m_pBlock = new Block;
Block* pb = m_pBlock;
for (int i = 1; i < m_iSize; ++ i)
{
pb->m_pNext = new Block;
pb->m_iMsgNoBitset = 0;
pb = pb->m_pNext;
}
pb->m_pNext = m_pBlock;
pb = m_pBlock;
char* pc = m_pBuffer->m_pcData;
for (int i = 0; i < m_iSize; ++ i)
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
}
m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
pthread_mutex_init(&m_BufLock, NULL);
}
CSndBuffer::~CSndBuffer()
{
Block* pb = m_pBlock->m_pNext;
while (pb != m_pBlock)
{
Block* temp = pb;
pb = pb->m_pNext;
delete temp;
}
delete m_pBlock;
while (m_pBuffer != NULL)
{
Buffer* temp = m_pBuffer;
m_pBuffer = m_pBuffer->m_pNext;
delete [] temp->m_pcData;
delete temp;
}
pthread_mutex_destroy(&m_BufLock);
}
void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime, ref_t<int32_t> r_msgno)
{
int32_t& msgno = *r_msgno;
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size ++;
HLOGC(mglog.Debug, log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes");
// dynamically increase sender buffer
while (size + m_iCount >= m_iSize)
{
HLOGC(mglog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
increase();
}
const uint64_t time = CTimer::getTime();
int32_t inorder = order ? MSGNO_PACKET_INORDER::mask : 0;
HLOGC(dlog.Debug, log << CONID() << "addBuffer: adding "
<< size << " packets (" << len << " bytes) to send, msgno=" << m_iNextMsgNo
<< (inorder ? "" : " NOT") << " in order");
Block* s = m_pLastBlock;
msgno = m_iNextMsgNo;
for (int i = 0; i < size; ++ i)
{
int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
HLOGC(dlog.Debug, log << "addBuffer: spreading from=" << (i*m_iMSS) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
memcpy(s->m_pcData, data + i * m_iMSS, pktlen);
s->m_iLength = pktlen;
s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
// NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT.
// if i == 0 == size-1, it results with PB_SOLO.
// Packets assigned to one message can be:
// [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] - 4 packets per message
// [PB_FIRST] [PB_LAST] - 2 packets per message
// [PB_SOLO] - 1 packet per message
s->m_ullSourceTime_us = srctime;
s->m_ullOriginTime_us = time;
s->m_iTTL = ttl;
// XXX unchecked condition: s->m_pNext == NULL.
// Should never happen, as the call to increase() should ensure enough buffers.
SRT_ASSERT(s->m_pNext);
s = s->m_pNext;
}
m_pLastBlock = s;
CGuard::enterCS(m_BufLock);
m_iCount += size;
m_iBytesCount += len;
m_ullLastOriginTime_us = time;
updateInputRate(time, size, len);
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
updAvgBufSize(time);
#endif
CGuard::leaveCS(m_BufLock);
// MSGNO_SEQ::mask has a form: 00000011111111...
// At least it's known that it's from some index inside til the end (to bit 0).
// If this value has been reached in a step of incrementation, it means that the
// maximum value has been reached. Casting to int32_t to ensure the same sign
// in comparison, although it's far from reaching the sign bit.
m_iNextMsgNo ++;
if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
m_iNextMsgNo = 1;
}
void CSndBuffer::setInputRateSmpPeriod(int period)
{
m_InRatePeriod = (uint64_t)period; //(usec) 0=no input rate calculation
}
void CSndBuffer::updateInputRate(uint64_t time, int pkts, int bytes)
{
//no input rate calculation
if (m_InRatePeriod == 0)
return;
if (m_InRateStartTime == 0)
{
m_InRateStartTime = time;
return;
}
m_iInRatePktsCount += pkts;
m_iInRateBytesCount += bytes;
// Trigger early update in fast start mode
const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US)
&& (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);
const uint64_t period_us = (time - m_InRateStartTime);
if (early_update || period_us > m_InRatePeriod)
{
//Required Byte/sec rate (payload + headers)
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
HLOGC(dlog.Debug, log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
<< " rate=" << (m_iInRateBps*8)/1000
<< "kbps interval=" << period_us);
m_iInRatePktsCount = 0;
m_iInRateBytesCount = 0;
m_InRateStartTime = time;
setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
}
}
int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
{
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size ++;
HLOGC(mglog.Debug, log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes");
// dynamically increase sender buffer
while (size + m_iCount >= m_iSize)
{
HLOGC(mglog.Debug, log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
increase();
}
HLOGC(dlog.Debug, log << CONID() << "addBufferFromFile: adding "
<< size << " packets (" << len << " bytes) to send, msgno=" << m_iNextMsgNo);
Block* s = m_pLastBlock;
int total = 0;
for (int i = 0; i < size; ++ i)
{
if (ifs.bad() || ifs.fail() || ifs.eof())
break;
int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
HLOGC(dlog.Debug, log << "addBufferFromFile: reading from=" << (i*m_iMSS) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
ifs.read(s->m_pcData, pktlen);
if ((pktlen = int(ifs.gcount())) <= 0)
break;
// currently file transfer is only available in streaming mode, message is always in order, ttl = infinite
s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
// NOTE: PB_FIRST | PB_LAST == PB_SOLO.
// none of PB_FIRST & PB_LAST == PB_SUBSEQUENT.
s->m_iLength = pktlen;
s->m_iTTL = -1;
s = s->m_pNext;
total += pktlen;
}
m_pLastBlock = s;
CGuard::enterCS(m_BufLock);
m_iCount += size;
m_iBytesCount += total;
CGuard::leaveCS(m_BufLock);
m_iNextMsgNo ++;
if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
m_iNextMsgNo = 1;
return total;
}
int CSndBuffer::readData(char** data, int32_t& msgno_bitset, uint64_t& srctime, int kflgs)
{
// No data to read
if (m_pCurrBlock == m_pLastBlock)
return 0;
// Make the packet REFLECT the data stored in the buffer.
*data = m_pCurrBlock->m_pcData;
int readlen = m_pCurrBlock->m_iLength;
// XXX This is probably done because the encryption should happen
// just once, and so this sets the encryption flags to both msgno bitset
// IN THE PACKET and IN THE BLOCK. This is probably to make the encryption
// happen at the time when scheduling a new packet to send, but the packet
// must remain in the send buffer until it's ACKed. For the case of rexmit
// the packet will be taken "as is" (that is, already encrypted).
//
// The problem is in the order of things:
// 0. When the application stores the data, some of the flags for PH_MSGNO are set.
// 1. The readData() is called to get the original data sent by the application.
// 2. The data are original and must be encrypted. They WILL BE encrypted, later.
// 3. So far we are in readData() so the encryption flags must be updated NOW because
// later we won't have access to the block's data.
// 4. After exiting from readData(), the packet is being encrypted. It's immediately
// sent, however the data must remain in the sending buffer until they are ACKed.
// 5. In case when rexmission is needed, the second overloaded version of readData
// is being called, and the buffer + PH_MSGNO value is extracted. All interesting
// flags must be present and correct at that time.
//
// The only sensible way to fix this problem is to encrypt the packet not after
// extracting from here, but when the packet is stored into CSndBuffer. The appropriate
// flags for PH_MSGNO will be applied directly there. Then here the value for setting
// PH_MSGNO will be set as is.
if (kflgs == -1)
{
HLOGC(dlog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING.");
readlen = 0;
}
else
{
m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs);
}
msgno_bitset = m_pCurrBlock->m_iMsgNoBitset;
srctime =
m_pCurrBlock->m_ullSourceTime_us ? m_pCurrBlock->m_ullSourceTime_us :
m_pCurrBlock->m_ullOriginTime_us;
m_pCurrBlock = m_pCurrBlock->m_pNext;
HLOGC(dlog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
return readlen;
}
int CSndBuffer::readData(char** data, const int offset, int32_t& msgno_bitset, uint64_t& srctime, int& msglen)
{
CGuard bufferguard(m_BufLock);
Block* p = m_pFirstBlock;
// XXX Suboptimal procedure to keep the blocks identifiable
// by sequence number. Consider using some circular buffer.
for (int i = 0; i < offset; ++ i)
p = p->m_pNext;
// Check if the block that is the next candidate to send (m_pCurrBlock pointing) is stale.
// If so, then inform the caller that it should first take care of the whole
// message (all blocks with that message id). Shift the m_pCurrBlock pointer
// to the position past the last of them. Then return -1 and set the
// msgno_bitset return reference to the message id that should be dropped as
// a whole.
// After taking care of that, the caller should immediately call this function again,
// this time possibly in order to find the real data to be sent.
// if found block is stale
// (This is for messages that have declared TTL - messages that fail to be sent
// before the TTL defined time comes, will be dropped).
if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_ullOriginTime_us) / 1000 > (uint64_t)p->m_iTTL))
{
int32_t msgno = p->getMsgSeq();
msglen = 1;
p = p->m_pNext;
bool move = false;
while (msgno == p->getMsgSeq())
{
if (p == m_pCurrBlock)
move = true;
p = p->m_pNext;
if (move)
m_pCurrBlock = p;
msglen ++;
}
HLOGC(dlog.Debug, log << "CSndBuffer::readData: due to TTL exceeded, " << msglen << " messages to drop, up to " << msgno);
// If readData returns -1, then msgno_bitset is understood as a Message ID to drop.
// This means that in this case it should be written by the message sequence value only
// (not the whole 4-byte bitset written at PH_MSGNO).
msgno_bitset = msgno;
return -1;
}
*data = p->m_pcData;
int readlen = p->m_iLength;
// XXX Here the value predicted to be applied to PH_MSGNO field is extracted.
// As this function is predicted to extract the data to send as a rexmited packet,
// the packet must be in the form ready to send - so, in case of encryption,
// encrypted, and with all ENC flags already set. So, the first call to send
// the packet originally (the other overload of this function) must set these
// flags.
msgno_bitset = p->m_iMsgNoBitset;
srctime =
p->m_ullSourceTime_us ? p->m_ullSourceTime_us :
p->m_ullOriginTime_us;
HLOGC(dlog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send [REXMIT]");
return readlen;
}
void CSndBuffer::ackData(int offset)
{
CGuard bufferguard(m_BufLock);
bool move = false;
for (int i = 0; i < offset; ++ i)
{
m_iBytesCount -= m_pFirstBlock->m_iLength;
if (m_pFirstBlock == m_pCurrBlock)
move = true;
m_pFirstBlock = m_pFirstBlock->m_pNext;
}
if (move)
m_pCurrBlock = m_pFirstBlock;
m_iCount -= offset;
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
updAvgBufSize(CTimer::getTime());
#endif
CTimer::triggerEvent();
}
int CSndBuffer::getCurrBufSize() const
{
return m_iCount;
}
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
int CSndBuffer::getAvgBufSize(ref_t<int> r_bytes, ref_t<int> r_tsp)
{
int& bytes = *r_bytes;
int& timespan = *r_tsp;
CGuard bufferguard(m_BufLock); /* Consistency of pkts vs. bytes vs. spantime */
/* update stats in case there was no add/ack activity lately */
updAvgBufSize(CTimer::getTime());
bytes = m_iBytesCountMAvg;
timespan = m_TimespanMAvg;
return(m_iCountMAvg);
}
void CSndBuffer::updAvgBufSize(uint64_t now)
{
const uint64_t elapsed_ms = (now - m_LastSamplingTime) / 1000; //ms since last sampling
if ((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 > elapsed_ms)
return;
if (1000 < elapsed_ms)
{
/* No sampling in last 1 sec, initialize average */
m_iCountMAvg = getCurrBufSize(Ref(m_iBytesCountMAvg), Ref(m_TimespanMAvg));
m_LastSamplingTime = now;
}
else //((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 <= elapsed_ms)
{
/*
* weight last average value between -1 sec and last sampling time (LST)
* and new value between last sampling time and now
* |elapsed_ms|
* +----------------------------------+-------+
* -1 LST 0(now)
*/
int instspan;
int bytescount;
int count = getCurrBufSize(Ref(bytescount), Ref(instspan));
HLOGC(dlog.Debug, log << "updAvgBufSize: " << elapsed_ms
<< ": " << count << " " << bytescount
<< " " << instspan << "ms");
m_iCountMAvg = (int)(((count * (1000 - elapsed_ms)) + (count * elapsed_ms)) / 1000);
m_iBytesCountMAvg = (int)(((bytescount * (1000 - elapsed_ms)) + (bytescount * elapsed_ms)) / 1000);
m_TimespanMAvg = (int)(((instspan * (1000 - elapsed_ms)) + (instspan * elapsed_ms)) / 1000);
m_LastSamplingTime = now;
}
}
#endif /* SRT_ENABLE_SNDBUFSZ_MAVG */
int CSndBuffer::getCurrBufSize(ref_t<int> bytes, ref_t<int> timespan)
{
*bytes = m_iBytesCount;
/*
* Timespan can be less then 1000 us (1 ms) if few packets.
* Also, if there is only one pkt in buffer, the time difference will be 0.
* Therefore, always add 1 ms if not empty.
*/
*timespan = 0 < m_iCount ? int((m_ullLastOriginTime_us - m_pFirstBlock->m_ullOriginTime_us) / 1000) + 1 : 0;
return m_iCount;
}
int CSndBuffer::dropLateData(int &bytes, uint64_t latetime)
{
int dpkts = 0;
int dbytes = 0;
bool move = false;
CGuard bufferguard(m_BufLock);
for (int i = 0; i < m_iCount && m_pFirstBlock->m_ullOriginTime_us < latetime; ++ i)
{
dpkts++;
dbytes += m_pFirstBlock->m_iLength;
if (m_pFirstBlock == m_pCurrBlock) move = true;
m_pFirstBlock = m_pFirstBlock->m_pNext;
}
if (move) m_pCurrBlock = m_pFirstBlock;
m_iCount -= dpkts;
m_iBytesCount -= dbytes;
bytes = dbytes;
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
updAvgBufSize(CTimer::getTime());
#endif /* SRT_ENABLE_SNDBUFSZ_MAVG */
// CTimer::triggerEvent();
return(dpkts);
}
void CSndBuffer::increase()
{
int unitsize = m_pBuffer->m_iSize;
// new physical buffer
Buffer* nbuf = NULL;
try
{
nbuf = new Buffer;
nbuf->m_pcData = new char [unitsize * m_iMSS];
}
catch (...)
{
delete nbuf;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
nbuf->m_iSize = unitsize;
nbuf->m_pNext = NULL;
// insert the buffer at the end of the buffer list
Buffer* p = m_pBuffer;
while (p->m_pNext != NULL)
p = p->m_pNext;
p->m_pNext = nbuf;
// new packet blocks
Block* nblk = NULL;
try
{
nblk = new Block;
}
catch (...)
{
delete nblk;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
Block* pb = nblk;
for (int i = 1; i < unitsize; ++ i)
{
pb->m_pNext = new Block;
pb = pb->m_pNext;
}
// insert the new blocks onto the existing one
pb->m_pNext = m_pLastBlock->m_pNext;
m_pLastBlock->m_pNext = nblk;
pb = nblk;
char* pc = nbuf->m_pcData;
for (int i = 0; i < unitsize; ++ i)
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
}
m_iSize += unitsize;
HLOGC(dlog.Debug, log << "CSndBuffer: BUFFER FULL - adding " << (unitsize*m_iMSS) << " bytes spread to " << unitsize << " blocks"
<< " (total size: " << m_iSize << " bytes)");
}
////////////////////////////////////////////////////////////////////////////////
/*
* RcvBuffer (circular buffer):
*
* |<------------------- m_iSize ----------------------------->|
* | |<--- acked pkts -->|<--- m_iMaxPos --->| |
* | | | | |
* +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
* | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[]
* +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
* | | | |
* | | \__last pkt received
* | \___ m_iLastAckPos: last ack sent
* \___ m_iStartPos: first message to read
*
* m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped
*
* thread safety:
* m_iStartPos: CUDT::m_RecvLock
* m_iLastAckPos: CUDT::m_AckLock
* m_iMaxPos: none? (modified on add and ack
*/
// XXX Init values moved to in-class.
//const uint32_t CRcvBuffer::TSBPD_WRAP_PERIOD = (30*1000000); //30 seconds (in usec)
//const int CRcvBuffer::TSBPD_DRIFT_MAX_VALUE = 5000; // usec
//const int CRcvBuffer::TSBPD_DRIFT_MAX_SAMPLES = 1000; // ACK-ACK packets
#ifdef SRT_DEBUG_TSBPD_DRIFT
//const int CRcvBuffer::TSBPD_DRIFT_PRT_SAMPLES = 200; // ACK-ACK packets
#endif
CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts):
m_pUnit(NULL),
m_iSize(bufsize_pkts),
m_pUnitQueue(queue),
m_iStartPos(0),
m_iLastAckPos(0),
m_iMaxPos(0),
m_iNotch(0)
,m_BytesCountLock()
,m_iBytesCount(0)
,m_iAckedPktsCount(0)
,m_iAckedBytesCount(0)
,m_iAvgPayloadSz(7*188)
,m_bTsbPdMode(false)
,m_uTsbPdDelay(0)
,m_ullTsbPdTimeBase(0)
,m_bTsbPdWrapCheck(false)
//,m_iTsbPdDrift(0)
//,m_TsbPdDriftSum(0)
//,m_iTsbPdDriftNbSamples(0)
#ifdef SRT_ENABLE_RCVBUFSZ_MAVG
,m_LastSamplingTime(0)
,m_TimespanMAvg(0)
,m_iCountMAvg(0)
,m_iBytesCountMAvg(0)
#endif
{
m_pUnit = new CUnit* [m_iSize];
for (int i = 0; i < m_iSize; ++ i)
m_pUnit[i] = NULL;
#ifdef SRT_DEBUG_TSBPD_DRIFT
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
#endif
pthread_mutex_init(&m_BytesCountLock, NULL);
}
CRcvBuffer::~CRcvBuffer()
{
for (int i = 0; i < m_iSize; ++ i)
{
if (m_pUnit[i] != NULL)
{
m_pUnitQueue->makeUnitFree(m_pUnit[i]);
}
}
delete [] m_pUnit;
pthread_mutex_destroy(&m_BytesCountLock);
}
void CRcvBuffer::countBytes(int pkts, int bytes, bool acked)
{
/*
* Byte counter changes from both sides (Recv & Ack) of the buffer
* so the higher level lock is not enough for thread safe op.
*
* pkts are...
* added (bytes>0, acked=false),
* acked (bytes>0, acked=true),
* removed (bytes<0, acked=n/a)
*/
CGuard cg(m_BytesCountLock);
if (!acked) //adding new pkt in RcvBuffer
{
m_iBytesCount += bytes; /* added or removed bytes from rcv buffer */
if (bytes > 0) /* Assuming one pkt when adding bytes */
m_iAvgPayloadSz = ((m_iAvgPayloadSz * (100 - 1)) + bytes) / 100;
}
else // acking/removing pkts to/from buffer
{
m_iAckedPktsCount += pkts; /* acked or removed pkts from rcv buffer */
m_iAckedBytesCount += bytes; /* acked or removed bytes from rcv buffer */
if (bytes < 0) m_iBytesCount += bytes; /* removed bytes from rcv buffer */
}
}
int CRcvBuffer::addData(CUnit* unit, int offset)
{
SRT_ASSERT(unit != NULL);
if (offset >= getAvailBufSize())
return -1;
const int pos = (m_iLastAckPos + offset) % m_iSize;
if (offset >= m_iMaxPos)
m_iMaxPos = offset + 1;
if (m_pUnit[pos] != NULL) {
HLOGC(dlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo
<< " rejected, already exists");
return -1;
}
m_pUnit[pos] = unit;
countBytes(1, (int) unit->m_Packet.getLength());
m_pUnitQueue->makeUnitGood(unit);
HLOGC(dlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo
<< " accepted, off=" << offset << " POS=" << pos);
return 0;
}
int CRcvBuffer::readBuffer(char* data, int len)
{
int p = m_iStartPos;
int lastack = m_iLastAckPos;
int rs = len;
#if ENABLE_HEAVY_LOGGING
char* begin = data;
#endif
const uint64_t now = (m_bTsbPdMode ? CTimer::getTime() : uint64_t());
HLOGC(dlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
while ((p != lastack) && (rs > 0))
{
if (m_pUnit[p] == NULL)
{
LOGC(dlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
return -1;
}
if (m_bTsbPdMode)
{
HLOGC(dlog.Debug, log << CONID() << "readBuffer: chk if time2play: NOW=" << now << " PKT TS=" << getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()));
if ((getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()) > now))
break; /* too early for this unit, return whatever was copied */
}
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
if (unitsize > rs)
unitsize = rs;
HLOGC(dlog.Debug, log << CONID() << "readBuffer: copying buffer #" << p
<< " targetpos=" << int(data-begin) << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize-rs));
memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
data += unitsize;
if ((rs > unitsize) || (rs == int(m_pUnit[p]->m_Packet.getLength()) - m_iNotch))
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
if (++ p == m_iSize)
p = 0;
m_iNotch = 0;
}
else
m_iNotch += rs;
rs -= unitsize;
}
/* we removed acked bytes form receive buffer */
countBytes(-1, -(len - rs), true);
m_iStartPos = p;
return len - rs;
}
int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
{
int p = m_iStartPos;
int lastack = m_iLastAckPos;
int rs = len;
while ((p != lastack) && (rs > 0))
{
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
if (unitsize > rs)
unitsize = rs;
ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
if (ofs.fail())
break;
if ((rs > unitsize) || (rs == int(m_pUnit[p]->m_Packet.getLength()) - m_iNotch))
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
if (++ p == m_iSize)
p = 0;
m_iNotch = 0;
}
else
m_iNotch += rs;
rs -= unitsize;
}
/* we removed acked bytes form receive buffer */
countBytes(-1, -(len - rs), true);
m_iStartPos = p;
return len - rs;
}
void CRcvBuffer::ackData(int len)
{
SRT_ASSERT(len < m_iSize);
SRT_ASSERT(len > 0);
{
int pkts = 0;
int bytes = 0;
for (int i = m_iLastAckPos, n = (m_iLastAckPos + len) % m_iSize; i != n; i = (i + 1) % m_iSize)
{
if (m_pUnit[i] == NULL)
continue;
pkts++;
bytes += (int) m_pUnit[i]->m_Packet.getLength();
}
if (pkts > 0) countBytes(pkts, bytes, true);
}
m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
m_iMaxPos -= len;
if (m_iMaxPos < 0)
m_iMaxPos = 0;
CTimer::triggerEvent();
}
void CRcvBuffer::skipData(int len)
{
/*
* Caller need protect both AckLock and RecvLock
* to move both m_iStartPos and m_iLastAckPost
*/
if (m_iStartPos == m_iLastAckPos)
m_iStartPos = (m_iStartPos + len) % m_iSize;
m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
m_iMaxPos -= len;
if (m_iMaxPos < 0)
m_iMaxPos = 0;
}
bool CRcvBuffer::getRcvFirstMsg(ref_t<uint64_t> r_tsbpdtime, ref_t<bool> r_passack, ref_t<int32_t> r_skipseqno, ref_t<int32_t> r_curpktseq)
{
int32_t& skipseqno = *r_skipseqno;
bool& passack = *r_passack;
skipseqno = -1;
passack = false;
// tsbpdtime will be retrieved by the below call
// Returned values:
// - tsbpdtime: real time when the packet is ready to play (whether ready to play or not)
// - passack: false (the report concerns a packet with an exactly next sequence)
// - skipseqno == -1: no packets to skip towards the first RTP
// - ppkt: that exactly packet that is reported (for debugging purposes)
// - @return: whether the reported packet is ready to play
/* Check the acknowledged packets */
if (getRcvReadyMsg(r_tsbpdtime, r_curpktseq))
{
HLOGC(dlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << (*r_curpktseq));
return true;
}
else if (*r_tsbpdtime != 0)
{
HLOGC(dlog.Debug, log << "getRcvFirstMsg: no packets found");
return false;
}
// getRcvReadyMsg returned false and tsbpdtime == 0.
// Below this line we have only two options:
// - m_iMaxPos == 0, which means that no more packets are in the buffer
// - returned: tsbpdtime=0, passack=true, skipseqno=-1, ppkt=0, @return false
// - m_iMaxPos > 0, which means that there are packets arrived after a lost packet:
// - returned: tsbpdtime=PKT.TS, passack=true, skipseqno=PKT.SEQ, ppkt=PKT, @return LOCAL(PKT.TS) <= NOW
/*
* No acked packets ready but caller want to know next packet to wait for
* Check the not yet acked packets that may be stuck by missing packet(s).
*/
bool haslost = false;
*r_tsbpdtime = 0; // redundant, for clarity
passack = true;
// XXX SUSPECTED ISSUE with this algorithm:
// The above call to getRcvReadyMsg() should report as to whether:
// - there is an EXACTLY NEXT SEQUENCE packet
// - this packet is ready to play.
//
// Situations handled after the call are when:
// - there's the next sequence packet available and it is ready to play
// - there are no packets at all, ready to play or not
//
// So, the remaining situation is that THERE ARE PACKETS that follow
// the current sequence, but they are not ready to play. This includes
// packets that have the exactly next sequence and packets that jump
// over a lost packet.
//
// As the getRcvReadyMsg() function walks through the incoming units
// to see if there's anything that satisfies these conditions, it *SHOULD*
// be also capable of checking if the next available packet, if it is
// there, is the next sequence packet or not. Retrieving this exactly
// packet would be most useful, as the test for play-readiness and
// sequentiality can be done on it directly.
//
// When done so, the below loop would be completely unnecessary.
// Logical description of the below algorithm:
// 1. Check if the VERY FIRST PACKET is valid; if so then:
// - check if it's ready to play, return boolean value that marks it.
for (int i = m_iLastAckPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
{
if ( !m_pUnit[i]
|| m_pUnit[i]->m_iFlag != CUnit::GOOD )
{
/* There are packets in the sequence not received yet */
haslost = true;
HLOGC(dlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i);
}
else
{
/* We got the 1st valid packet */
*r_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (*r_tsbpdtime <= CTimer::getTime())
{
/* Packet ready to play */
if (haslost)
{
/*
* Packet stuck on non-acked side because of missing packets.
* Tell 1st valid packet seqno so caller can skip (drop) the missing packets.
*/
skipseqno = m_pUnit[i]->m_Packet.m_iSeqNo;
*r_curpktseq = skipseqno;
}
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found ready packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
// NOTE: if haslost is not set, it means that this is the VERY FIRST
// packet, that is, packet currently at pos = m_iLastAckPos. There's no
// possibility that it is so otherwise because:
// - if this first good packet is ready to play, THIS HERE RETURNS NOW.
// ...
return true;
}
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
// ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO,
// just states that there's no ready packet to play.
// ...
return false;
}
// ... and if this first packet WASN'T GOOD, the loop continues, however since now
// the 'haslost' is set, which means that it continues only to find the first valid
// packet after stating that the very first packet isn't valid.
}
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
return false;
}
bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpktseq)
{
*tsbpdtime = 0;
IF_HEAVY_LOGGING(const char* reason = "NOT RECEIVED");
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
{
bool freeunit = false;
/* Skip any invalid skipped/dropped packets */
if (m_pUnit[i] == NULL)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " SKIPPED - no unit there");
if (++ m_iStartPos == m_iSize)
m_iStartPos = 0;
continue;
}
*curpktseq = m_pUnit[i]->m_Packet.getSeqNo();
if (m_pUnit[i]->m_iFlag != CUnit::GOOD)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " SKIPPED - unit not good");
freeunit = true;
}
else
{
*tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
int64_t towait = (*tsbpdtime - CTimer::getTime());
if (towait > 0)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " pkt %" << curpktseq.get()
<< " NOT ready to play (only in " << (towait/1000.0) << "ms)");
return false;
}
if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
{
IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
freeunit = true; /* packet not decrypted */
}
else
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " pkt %" << curpktseq.get()
<< " ready to play (delayed " << (-towait/1000.0) << "ms)");
return true;
}
}
if (freeunit)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED");
/* removed skipped, dropped, undecryptable bytes from rcv buffer */
const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
countBytes(-1, -rmbytes, true);
CUnit* tmp = m_pUnit[i];
m_pUnit[i] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
if (++m_iStartPos == m_iSize)
m_iStartPos = 0;
}
}
HLOGC(mglog.Debug, log << "getRcvReadyMsg: nothing to deliver: " << reason);
return false;
}
/*
* Return receivable data status (packet timestamp ready to play if TsbPd mode)
* Return playtime (tsbpdtime) of 1st packet in queue, ready to play or not
*
* Return data ready to be received (packet timestamp ready to play if TsbPd mode)
* Using getRcvDataSize() to know if there is something to read as it was widely
* used in the code (core.cpp) is expensive in TsbPD mode, hence this simpler function
* that only check if first packet in queue is ready.
*/
bool CRcvBuffer::isRcvDataReady(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpktseq)
{
*tsbpdtime = 0;
if (m_bTsbPdMode)
{
CPacket* pkt = getRcvReadyPacket();
if (!pkt)
return false;
/*
* Acknowledged data is available,
* Only say ready if time to deliver.
* Report the timestamp, ready or not.
*/
*curpktseq = pkt->getSeqNo();
*tsbpdtime = getPktTsbPdTime(pkt->getMsgTimeStamp());
return (*tsbpdtime <= CTimer::getTime());
}
return isRcvDataAvailable();
}
// XXX This function may be called only after checking
// if m_bTsbPdMode.
CPacket* CRcvBuffer::getRcvReadyPacket()
{
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
{
/*
* Skip missing packets that did not arrive in time.
*/
if ( m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD )
return &m_pUnit[i]->m_Packet;
}
return 0;
}
#if ENABLE_HEAVY_LOGGING
// This function is for debug purposes only and it's called only
// from within HLOG* macros.
void CRcvBuffer::reportBufferStats()
{
int nmissing = 0;
int32_t low_seq= -1, high_seq = -1;
int32_t low_ts = 0, high_ts = 0;
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
{
if ( m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD )
{
low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp;
break;
}
++nmissing;
}
// Not sure if a packet MUST BE at the last ack pos position, so check, just in case.
int n = m_iLastAckPos;
if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
{
high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp;
high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
}
else
{
// Possibilities are:
// m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined.
// No packet: low_ts == 0, so high_ts == 0, too.
high_ts = low_ts;
}
// The 32-bit timestamps are relative and roll over oftten; what
// we really need is the timestamp difference. The only place where
// we can ask for the time base is the upper time because when trying
// to receive the time base for the lower time we'd break the requirement
// for monotonic clock.
uint64_t upper_time = high_ts;
uint64_t lower_time = low_ts;
if (lower_time > upper_time)
upper_time += uint64_t(CPacket::MAX_TIMESTAMP)+1;
int32_t timespan = upper_time - lower_time;
int seqspan = 0;
if (low_seq != -1 && high_seq != -1)
{
seqspan = CSeqNo::seqoff(low_seq, high_seq);
}
LOGC(dlog.Debug, log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing << "pkts");
LOGC(dlog.Debug, log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << FormatTime(lower_time) << " hi=" << FormatTime(upper_time) << ")");
}
#endif // ENABLE_HEAVY_LOGGING
bool CRcvBuffer::isRcvDataReady()
{
uint64_t tsbpdtime;
int32_t seq;
return isRcvDataReady(Ref(tsbpdtime), Ref(seq));
}
int CRcvBuffer::getAvailBufSize() const
{
// One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
return m_iSize - getRcvDataSize() - 1;
}
int CRcvBuffer::getRcvDataSize() const
{
if (m_iLastAckPos >= m_iStartPos)
return m_iLastAckPos - m_iStartPos;
return m_iSize + m_iLastAckPos - m_iStartPos;
}
int CRcvBuffer::debugGetSize() const
{
// Does exactly the same as getRcvDataSize, but
// it should be used FOR INFORMATIONAL PURPOSES ONLY.
// The source values might be changed in another thread
// during the calculation, although worst case the
// resulting value may differ to the real buffer size by 1.
int from = m_iStartPos, to = m_iLastAckPos;
int size = to - from;
if (size < 0)
size += m_iSize;
return size;
}
bool CRcvBuffer::empty() const
{
// This will not always return the intended value,
// that is, it may return false when the buffer really is
// empty - but it will return true then in one of next calls.
// This function will be always called again at some point
// if it returned false, and on true the connection
// is going to be broken - so this behavior is acceptable.
return m_iStartPos == m_iLastAckPos;
}
#ifdef SRT_ENABLE_RCVBUFSZ_MAVG
/* Return moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
int CRcvBuffer::getRcvAvgDataSize(int &bytes, int &timespan)
{
timespan = m_TimespanMAvg;
bytes = m_iBytesCountMAvg;
return(m_iCountMAvg);
}
/* Update moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
void CRcvBuffer::updRcvAvgDataSize(uint64_t now)
{
const uint64_t elapsed_ms = (now - m_LastSamplingTime) / 1000; //ms since last sampling
if ((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 > elapsed_ms)
return; /* Last sampling too recent, skip */
if (1000 < elapsed_ms)
{
/* No sampling in last 1 sec, initialize/reset moving average */
m_iCountMAvg = getRcvDataSize(m_iBytesCountMAvg, m_TimespanMAvg);
m_LastSamplingTime = now;
HLOGC(dlog.Debug, log << "getRcvDataSize: " << m_iCountMAvg << " " << m_iBytesCountMAvg
<< " " << m_TimespanMAvg << " ms elapsed_ms: " << elapsed_ms << " ms");
}
else if ((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 <= elapsed_ms)
{
/*
* Weight last average value between -1 sec and last sampling time (LST)
* and new value between last sampling time and now
* |elapsed_ms|
* +----------------------------------+-------+
* -1 LST 0(now)
*/
int instspan;
int bytescount;
int count = getRcvDataSize(bytescount, instspan);
m_iCountMAvg = (int)(((count * (1000 - elapsed_ms)) + (count * elapsed_ms)) / 1000);
m_iBytesCountMAvg = (int)(((bytescount * (1000 - elapsed_ms)) + (bytescount * elapsed_ms)) / 1000);
m_TimespanMAvg = (int)(((instspan * (1000 - elapsed_ms)) + (instspan * elapsed_ms)) / 1000);
m_LastSamplingTime = now;
HLOGC(dlog.Debug, log << "getRcvDataSize: " << count << " " << bytescount << " " << instspan
<< " ms elapsed_ms: " << elapsed_ms << " ms");
}
}
#endif /* SRT_ENABLE_RCVBUFSZ_MAVG */
/* Return acked data pkts, bytes, and timespan (ms) of the receive buffer */
int CRcvBuffer::getRcvDataSize(int &bytes, int &timespan)
{
timespan = 0;
if (m_bTsbPdMode)
{
// Get a valid startpos.
// Skip invalid entries in the beginning, if any.
int startpos = m_iStartPos;
for (; startpos != m_iLastAckPos; startpos = (startpos + 1) % m_iSize)
{
if ((NULL != m_pUnit[startpos]) && (CUnit::GOOD == m_pUnit[startpos]->m_iFlag))
break;
}
int endpos = m_iLastAckPos;
if (m_iLastAckPos != startpos)
{
/*
* |<--- DataSpan ---->|<- m_iMaxPos ->|
* +---+---+---+---+---+---+---+---+---+---+---+---
* | | 1 | 1 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | | m_pUnits[]
* +---+---+---+---+---+---+---+---+---+---+---+---
* | |
* \_ m_iStartPos \_ m_iLastAckPos
*
* m_pUnits[startpos] shall be valid (->m_iFlag==CUnit::GOOD).
* If m_pUnits[m_iLastAckPos-1] is not valid (NULL or ->m_iFlag!=CUnit::GOOD),
* it means m_pUnits[m_iLastAckPos] is valid since a valid unit is needed to skip.
* Favor m_pUnits[m_iLastAckPos] if valid over [m_iLastAckPos-1] to include the whole acked interval.
*/
if ((m_iMaxPos <= 0)
|| (!m_pUnit[m_iLastAckPos])
|| (m_pUnit[m_iLastAckPos]->m_iFlag != CUnit::GOOD))
{
endpos = (m_iLastAckPos == 0 ? m_iSize - 1 : m_iLastAckPos - 1);
}
if ((NULL != m_pUnit[endpos]) && (NULL != m_pUnit[startpos]))
{
const uint64_t startstamp = getPktTsbPdTime(m_pUnit[startpos]->m_Packet.getMsgTimeStamp());
const uint64_t endstamp = getPktTsbPdTime(m_pUnit[endpos]->m_Packet.getMsgTimeStamp());
/*
* There are sampling conditions where spantime is < 0 (big unsigned value).
* It has been observed after changing the SRT latency from 450 to 200 on the sender.
*
* Possible packet order corruption when dropping packet,
* cause by bad thread protection when adding packet in queue
* was later discovered and fixed. Security below kept.
*
* DateTime RecvRate LostRate DropRate AvailBw RTT RecvBufs PdDelay
* 2014-12-08T15:04:25-0500 4712 110 0 96509 33.710 393 450
* 2014-12-08T15:04:35-0500 4512 95 0 107771 33.493 1496542976 200
* 2014-12-08T15:04:40-0500 4213 106 3 107352 53.657 9499425 200
* 2014-12-08T15:04:45-0500 4575 104 0 102194 53.614 59666 200
* 2014-12-08T15:04:50-0500 4475 124 0 100543 53.526 505 200
*/
if (endstamp > startstamp)
timespan = (int)((endstamp - startstamp) / 1000);
}
/*
* Timespan can be less then 1000 us (1 ms) if few packets.
* Also, if there is only one pkt in buffer, the time difference will be 0.
* Therefore, always add 1 ms if not empty.
*/
if (0 < m_iAckedPktsCount)
timespan += 1;
}
}
HLOGF(dlog.Debug, "getRcvDataSize: %6d %6d %6d ms\n", m_iAckedPktsCount, m_iAckedBytesCount, timespan);
bytes = m_iAckedBytesCount;
return m_iAckedPktsCount;
}
int CRcvBuffer::getRcvAvgPayloadSize() const
{
return m_iAvgPayloadSz;
}
void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
{
for (int i = m_iStartPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
if ((m_pUnit[i] != NULL)
&& (m_pUnit[i]->m_Packet.getMsgSeq(using_rexmit_flag) == msgno))
m_pUnit[i]->m_iFlag = CUnit::DROPPED;
}
uint64_t CRcvBuffer::getTsbPdTimeBase(uint32_t timestamp_us)
{
/*
* Packet timestamps wrap around every 01h11m35s (32-bit in usec)
* When added to the peer start time (base time),
* wrapped around timestamps don't provide a valid local packet delevery time.
*
* A wrap check period starts 30 seconds before the wrap point.
* In this period, timestamps smaller than 30 seconds are considered to have wrapped around (then adjusted).
* The wrap check period ends 30 seconds after the wrap point, afterwhich time base has been adjusted.
*/
uint64_t carryover = 0;
// This function should generally return the timebase for the given timestamp_us.
// It's assumed that the timestamp_us, for which this function is being called,
// is received as monotonic clock. This function then traces the changes in the
// timestamps passed as argument and catches the moment when the 64-bit timebase
// should be increased by a "segment length" (MAX_TIMESTAMP+1).
// The checks will be provided for the following split:
// [INITIAL30][FOLLOWING30]....[LAST30] <-- == CPacket::MAX_TIMESTAMP
//
// The following actions should be taken:
// 1. Check if this is [LAST30]. If so, ENTER TSBPD-wrap-check state
// 2. Then, it should turn into [INITIAL30] at some point. If so, use carryover MAX+1.
// 3. Then it should switch to [FOLLOWING30]. If this is detected,
// - EXIT TSBPD-wrap-check state
// - save the carryover as the current time base.
if (m_bTsbPdWrapCheck)
{
// Wrap check period.
if (timestamp_us < TSBPD_WRAP_PERIOD)
{
carryover = uint64_t(CPacket::MAX_TIMESTAMP) + 1;
}
//
else if ((timestamp_us >= TSBPD_WRAP_PERIOD)
&& (timestamp_us <= (TSBPD_WRAP_PERIOD * 2)))
{
/* Exiting wrap check period (if for packet delivery head) */
m_bTsbPdWrapCheck = false;
m_ullTsbPdTimeBase += uint64_t(CPacket::MAX_TIMESTAMP) + 1;
tslog.Debug("tsbpd wrap period ends");
}
}
// Check if timestamp_us is in the last 30 seconds before reaching the MAX_TIMESTAMP.
else if (timestamp_us > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD))
{
/* Approching wrap around point, start wrap check period (if for packet delivery head) */
m_bTsbPdWrapCheck = true;
tslog.Debug("tsbpd wrap period begins");
}
return (m_ullTsbPdTimeBase + carryover);
}
uint64_t CRcvBuffer::getPktTsbPdTime(uint32_t timestamp)
{
return(getTsbPdTimeBase(timestamp) + m_uTsbPdDelay + timestamp + m_DriftTracer.drift());
}
int CRcvBuffer::setRcvTsbPdMode(uint64_t timebase, uint32_t delay)
{
m_bTsbPdMode = true;
m_bTsbPdWrapCheck = false;
// Timebase passed here comes is calculated as:
// >>> CTimer::getTime() - ctrlpkt->m_iTimeStamp
// where ctrlpkt is the packet with SRT_CMD_HSREQ message.
//
// This function is called in the HSREQ reception handler only.
m_ullTsbPdTimeBase = timebase;
// XXX Seems like this may not work correctly.
// At least this solution this way won't work with application-supplied
// timestamps. For that case the timestamps should be taken exclusively
// from the data packets because in case of application-supplied timestamps
// they come from completely different server and undergo different rules
// of network latency and drift.
m_uTsbPdDelay = delay;
return 0;
}
#ifdef SRT_DEBUG_TSBPD_DRIFT
void CRcvBuffer::printDriftHistogram(int64_t iDrift)
{
/*
* Build histogram of drift values
* First line (ms): <=-10.0 -9.0 ... -1.0 - 0.0 + 1.0 ... 9.0 >=10.0
* Second line (ms): -0.9 ... -0.1 - 0.0 + 0.1 ... 0.9
* 0 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 1 0 0 0 0 0 0
* 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 0 0 0 0 0 0
*/
iDrift /= 100; // uSec to 100 uSec (0.1ms)
if (-10 < iDrift && iDrift < 10)
{
/* Fill 100us histogram -900 .. 900 us 100 us increments */
m_TsbPdDriftHisto100us[10 + iDrift]++;
}
else
{
/* Fill 1ms histogram <=-10.0, -9.0 .. 9.0, >=10.0 ms in 1 ms increments */
iDrift /= 10; // 100uSec to 1ms
if (-10 < iDrift && iDrift < 10) m_TsbPdDriftHisto1ms[10 + iDrift]++;
else if (iDrift <= -10) m_TsbPdDriftHisto1ms[0]++;
else m_TsbPdDriftHisto1ms[20]++;
}
if ((m_iTsbPdDriftNbSamples % TSBPD_DRIFT_PRT_SAMPLES) == 0)
{
int *histo = m_TsbPdDriftHisto1ms;
fprintf(stderr, "%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[0],histo[1],histo[2],histo[3],histo[4],
histo[5],histo[6],histo[7],histo[8],histo[9],histo[10]);
fprintf(stderr, "%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],histo[12],histo[13],histo[14],histo[15],
histo[16],histo[17],histo[18],histo[19],histo[20]);
histo = m_TsbPdDriftHisto100us;
fprintf(stderr, " %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[1],histo[2],histo[3],histo[4],histo[5],
histo[6],histo[7],histo[8],histo[9],histo[10]);
fprintf(stderr, "%4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],histo[12],histo[13],histo[14],histo[15],
histo[16],histo[17],histo[18],histo[19]);
}
}
void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg)
{
char szTime[32] = {};
uint64_t now = CTimer::getTime();
time_t tnow = (time_t)(now/1000000);
strftime(szTime, sizeof(szTime), "%H:%M:%S", localtime(&tnow));
fprintf(stderr, "%s.%03d: tsbpd offset=%d drift=%d usec\n",
szTime, (int)((now%1000000)/1000), tsbPdOffset, tsbPdDriftAvg);
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
}
#endif /* SRT_DEBUG_TSBPD_DRIFT */
void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock)
{
if (!m_bTsbPdMode) // Not checked unless in TSBPD mode
return;
/*
* TsbPD time drift correction
* TsbPD time slowly drift over long period depleting decoder buffer or raising latency
* Re-evaluate the time adjustment value using a receiver control packet (ACK-ACK).
* ACK-ACK timestamp is RTT/2 ago (in sender's time base)
* Data packet have origin time stamp which is older when retransmitted so not suitable for this.
*
* Every TSBPD_DRIFT_MAX_SAMPLES packets, the average drift is calculated
* if -TSBPD_DRIFT_MAX_VALUE < avgTsbPdDrift < TSBPD_DRIFT_MAX_VALUE uSec, pass drift value to RcvBuffer to adjust delevery time.
* if outside this range, adjust this->TsbPdTimeOffset and RcvBuffer->TsbPdTimeBase by +-TSBPD_DRIFT_MAX_VALUE uSec
* to maintain TsbPdDrift values in reasonable range (-5ms .. +5ms).
*/
// Note important thing: this function is being called _EXCLUSIVELY_ in the handler
// of UMSG_ACKACK command reception. This means that the timestamp used here comes
// from the CONTROL domain, not DATA domain (timestamps from DATA domain may be
// either schedule time or a time supplied by the application).
int64_t iDrift = CTimer::getTime() - (getTsbPdTimeBase(timestamp) + timestamp);
CGuard::enterCS(mutex_to_lock);
bool updated = m_DriftTracer.update(iDrift);
#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftHistogram(iDrift);
#endif /* SRT_DEBUG_TSBPD_DRIFT */
if ( updated )
{
#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
#endif /* SRT_DEBUG_TSBPD_DRIFT */
#if ENABLE_HEAVY_LOGGING
uint64_t oldbase = m_ullTsbPdTimeBase;
#endif
m_ullTsbPdTimeBase += m_DriftTracer.overdrift();
HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms AVG="
<< (m_DriftTracer.drift()/1000.0) << "ms, TB: "
<< FormatTime(oldbase) << " UPDATED TO: " << FormatTime(m_ullTsbPdTimeBase));
}
else
{
HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms TB REMAINS: " << FormatTime(m_ullTsbPdTimeBase));
}
CGuard::leaveCS(mutex_to_lock);
}
int CRcvBuffer::readMsg(char* data, int len)
{
SRT_MSGCTRL dummy = srt_msgctrl_default;
return readMsg(data, len, Ref(dummy));
}
int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
{
SRT_MSGCTRL& msgctl = *r_msgctl;
int p, q;
bool passack;
bool empty = true;
uint64_t& rplaytime = msgctl.srctime;
#ifdef ENABLE_HEAVY_LOGGING
reportBufferStats();
#endif
if (m_bTsbPdMode)
{
passack = false;
int seq = 0;
if (getRcvReadyMsg(Ref(rplaytime), Ref(seq)))
{
empty = false;
// In TSBPD mode you always read one message
// at a time and a message always fits in one UDP packet,
// so in one "unit".
p = q = m_iStartPos;
#ifdef SRT_DEBUG_TSBPD_OUTJITTER
uint64_t now = CTimer::getTime();
if ((now - rplaytime)/10 < 10)
m_ulPdHisto[0][(now - rplaytime)/10]++;
else if ((now - rplaytime)/100 < 10)
m_ulPdHisto[1][(now - rplaytime)/100]++;
else if ((now - rplaytime)/1000 < 10)
m_ulPdHisto[2][(now - rplaytime)/1000]++;
else
m_ulPdHisto[3][1]++;
#endif /* SRT_DEBUG_TSBPD_OUTJITTER */
}
}
else
{
rplaytime = 0;
if (scanMsg(Ref(p), Ref(q), Ref(passack)))
empty = false;
}
if (empty)
return 0;
// This should happen just once. By 'empty' condition
// we have a guarantee that m_pUnit[p] exists and is valid.
CPacket& pkt1 = m_pUnit[p]->m_Packet;
// This returns the sequence number and message number to
// the API caller.
msgctl.pktseq = pkt1.getSeqNo();
msgctl.msgno = pkt1.getMsgSeq();
SRT_ASSERT(len > 0);
int rs = len > 0 ? len : 0;
while (p != (q + 1) % m_iSize)
{
const int pktlen = (int)m_pUnit[p]->m_Packet.getLength();
// When unitsize is less than pktlen, only a fragment is copied to the output 'data',
// but still the whole packet is removed from the receiver buffer.
if (pktlen > 0)
countBytes(-1, -pktlen, true);
const int unitsize = ((rs >= 0) && (pktlen > rs)) ? rs : pktlen;
HLOGC(mglog.Debug, log << "readMsg: checking unit POS=" << p);
if (unitsize > 0)
{
memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
data += unitsize;
rs -= unitsize;
#if ENABLE_HEAVY_LOGGING
{
static uint64_t prev_now;
static uint64_t prev_srctime;
CPacket& pkt = m_pUnit[p]->m_Packet;
int32_t seq = pkt.m_iSeqNo;
uint64_t nowtime = CTimer::getTime();
//CTimer::rdtsc(nowtime);
uint64_t srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp());
int64_t timediff = nowtime - srctime;
int64_t nowdiff = prev_now ? (nowtime - prev_now) : 0;
uint64_t srctimediff = prev_srctime ? (srctime - prev_srctime) : 0;
HLOGC(dlog.Debug, log << CONID() << "readMsg: DELIVERED seq=" << seq
<< " from POS=" << p << " T="
<< FormatTime(srctime) << " in " << (timediff/1000.0)
<< "ms - TIME-PREVIOUS: PKT: " << (srctimediff/1000.0)
<< " LOCAL: " << (nowdiff/1000.0)
<< " !" << BufferStamp(pkt.data(), pkt.size()));
prev_now = nowtime;
prev_srctime = srctime;
}
#endif
}
else
{
HLOGC(dlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT");
}
if (!passack)
{
HLOGC(dlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p);
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
}
else
{
HLOGC(dlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p);
m_pUnit[p]->m_iFlag = CUnit::PASSACK;
}
if (++ p == m_iSize)
p = 0;
}
if (!passack)
m_iStartPos = (q + 1) % m_iSize;
return len - rs;
}
bool CRcvBuffer::scanMsg(ref_t<int> r_p, ref_t<int> r_q, ref_t<bool> passack)
{
int& p = *r_p;
int& q = *r_q;
// empty buffer
if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
{
HLOGC(mglog.Debug, log << "scanMsg: empty buffer");
return false;
}
int rmpkts = 0;
int rmbytes = 0;
//skip all bad msgs at the beginning
while (m_iStartPos != m_iLastAckPos)
{
// Roll up to the first valid unit
if (!m_pUnit[m_iStartPos])
{
if (++ m_iStartPos == m_iSize)
m_iStartPos = 0;
continue;
}
// Note: PB_FIRST | PB_LAST == PB_SOLO.
// testing if boundary() & PB_FIRST tests if the msg is first OR solo.
if ( m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD
&& m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST )
{
bool good = true;
// look ahead for the whole message
// We expect to see either of:
// [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST]
// [PB_SOLO]
// but not:
// [PB_FIRST] NULL ...
// [PB_FIRST] FREE/PASSACK/DROPPED...
// If the message didn't look as expected, interrupt this.
// This begins with a message starting at m_iStartPos
// up to m_iLastAckPos OR until the PB_LAST message is found.
// If any of the units on this way isn't good, this OUTER loop
// will be interrupted.
for (int i = m_iStartPos; i != m_iLastAckPos;)
{
if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
{
good = false;
break;
}
// Likewise, boundary() & PB_LAST will be satisfied for last OR solo.
if ( m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST )
break;
if (++ i == m_iSize)
i = 0;
}
if (good)
break;
}
CUnit* tmp = m_pUnit[m_iStartPos];
m_pUnit[m_iStartPos] = NULL;
rmpkts++;
rmbytes += (int) tmp->m_Packet.getLength();
m_pUnitQueue->makeUnitFree(tmp);
if (++ m_iStartPos == m_iSize)
m_iStartPos = 0;
}
/* we removed bytes form receive buffer */
countBytes(-rmpkts, -rmbytes, true);
// Not sure if this is correct, but this above 'while' loop exits
// under the following conditions only:
// - m_iStartPos == m_iLastAckPos (that makes passack = true)
// - found at least GOOD unit with PB_FIRST and not all messages up to PB_LAST are good,
// in which case it returns with m_iStartPos <% m_iLastAckPos (earlier)
// Also all units that lied before m_iStartPos are removed.
p = -1; // message head
q = m_iStartPos; // message tail
*passack = m_iStartPos == m_iLastAckPos;
bool found = false;
// looking for the first message
//>>m_pUnit[size + m_iMaxPos] is not valid
// XXX Would be nice to make some very thorough refactoring here.
// This rolls by q variable from m_iStartPos up to m_iLastAckPos,
// actually from the first message up to the one with PB_LAST
// or PB_SOLO boundary.
// The 'i' variable used in this loop is just a stub, and the
// upper value is just to make it "virtually infinite, but with
// no exaggeration" (actually it makes sure that this loop does
// not roll more than around the whole cyclic container). This variable
// isn't used inside the loop at all.
for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++ i)
{
if (m_pUnit[q] && m_pUnit[q]->m_iFlag == CUnit::GOOD)
{
// Equivalent pseudocode:
// PacketBoundary bound = m_pUnit[q]->m_Packet.getMsgBoundary();
// if ( IsSet(bound, PB_FIRST) )
// p = q;
// if ( IsSet(bound, PB_LAST) && p != -1 )
// found = true;
//
// Not implemented this way because it uselessly check p for -1
// also after setting it explicitly.
switch (m_pUnit[q]->m_Packet.getMsgBoundary())
{
case PB_SOLO: // 11
p = q;
found = true;
break;
case PB_FIRST: // 10
p = q;
break;
case PB_LAST: // 01
if (p != -1)
found = true;
break;
case PB_SUBSEQUENT:
; // do nothing (caught first, rolling for last)
}
}
else
{
// a hole in this message, not valid, restart search
p = -1;
}
// 'found' is set when the current iteration hit a message with PB_LAST
// (including PB_SOLO since the very first message).
if (found)
{
// the msg has to be ack'ed or it is allowed to read out of order, and was not read before
if (!*passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
{
HLOGC(mglog.Debug, log << "scanMsg: found next-to-broken message, delivering OUT OF ORDER.");
break;
}
found = false;
}
if (++ q == m_iSize)
q = 0;
if (q == m_iLastAckPos)
*passack = true;
}
// no msg found
if (!found)
{
// NOTE:
// This situation may only happen if:
// - Found a packet with PB_FIRST, so p = q at the moment when it was found
// - Possibly found following components of that message up to shifted q
// - Found no terminal packet (PB_LAST) for that message.
// if the message is larger than the receiver buffer, return part of the message
if ((p != -1) && ((q + 1) % m_iSize == p))
{
HLOGC(mglog.Debug, log << "scanMsg: BUFFER FULL and message is INCOMPLETE. Returning PARTIAL MESSAGE.");
found = true;
}
else
{
HLOGC(mglog.Debug, log << "scanMsg: PARTIAL or NO MESSAGE found: p=" << p << " q=" << q);
}
}
else
{
HLOGC(mglog.Debug, log << "scanMsg: extracted message p=" << p << " q=" << q << " (" << ((q-p+m_iSize+1)%m_iSize) << " packets)");
}
return found;
}