mirror of
https://github.com/ossrs/srs.git
synced 2025-02-13 20:01:56 +00:00
1955 lines
65 KiB
C++
1955 lines
65 KiB
C++
/*
|
|
* SRT - Secure, Reliable, Transport
|
|
* Copyright (c) 2018 Haivision Systems Inc.
|
|
*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*
|
|
*/
|
|
|
|
/*****************************************************************************
|
|
Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
|
|
All rights reserved.
|
|
|
|
Redistribution and use in source and binary forms, with or without
|
|
modification, are permitted provided that the following conditions are
|
|
met:
|
|
|
|
* Redistributions of source code must retain the above
|
|
copyright notice, this list of conditions and the
|
|
following disclaimer.
|
|
|
|
* Redistributions in binary form must reproduce the
|
|
above copyright notice, this list of conditions
|
|
and the following disclaimer in the documentation
|
|
and/or other materials provided with the distribution.
|
|
|
|
* Neither the name of the University of Illinois
|
|
nor the names of its contributors may be used to
|
|
endorse or promote products derived from this
|
|
software without specific prior written permission.
|
|
|
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
|
|
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
|
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
|
|
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
|
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
|
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
|
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
|
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
|
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*****************************************************************************/
|
|
|
|
/*****************************************************************************
|
|
written by
|
|
Yunhong Gu, last updated 03/12/2011
|
|
modified by
|
|
Haivision Systems Inc.
|
|
*****************************************************************************/
|
|
|
|
#include <cstring>
|
|
#include <cmath>
|
|
#include "buffer.h"
|
|
#include "packet.h"
|
|
#include "core.h" // provides some constants
|
|
#include "logging.h"
|
|
|
|
using namespace std;
|
|
using namespace srt_logging;
|
|
|
|
CSndBuffer::CSndBuffer(int size, int mss)
|
|
: m_BufLock()
|
|
, m_pBlock(NULL)
|
|
, m_pFirstBlock(NULL)
|
|
, m_pCurrBlock(NULL)
|
|
, m_pLastBlock(NULL)
|
|
, m_pBuffer(NULL)
|
|
, m_iNextMsgNo(1)
|
|
, m_iSize(size)
|
|
, m_iMSS(mss)
|
|
, m_iCount(0)
|
|
, m_iBytesCount(0)
|
|
, m_ullLastOriginTime_us(0)
|
|
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
|
|
, m_LastSamplingTime(0)
|
|
, m_iCountMAvg(0)
|
|
, m_iBytesCountMAvg(0)
|
|
, m_TimespanMAvg(0)
|
|
#endif
|
|
, m_iInRatePktsCount(0)
|
|
, m_iInRateBytesCount(0)
|
|
, m_InRateStartTime(0)
|
|
, m_InRatePeriod(INPUTRATE_FAST_START_US) // 0.5 sec (fast start)
|
|
, m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
|
|
{
|
|
// initial physical buffer of "size"
|
|
m_pBuffer = new Buffer;
|
|
m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];
|
|
m_pBuffer->m_iSize = m_iSize;
|
|
m_pBuffer->m_pNext = NULL;
|
|
|
|
// circular linked list for out bound packets
|
|
m_pBlock = new Block;
|
|
Block* pb = m_pBlock;
|
|
for (int i = 1; i < m_iSize; ++ i)
|
|
{
|
|
pb->m_pNext = new Block;
|
|
pb->m_iMsgNoBitset = 0;
|
|
pb = pb->m_pNext;
|
|
}
|
|
pb->m_pNext = m_pBlock;
|
|
|
|
pb = m_pBlock;
|
|
char* pc = m_pBuffer->m_pcData;
|
|
for (int i = 0; i < m_iSize; ++ i)
|
|
{
|
|
pb->m_pcData = pc;
|
|
pb = pb->m_pNext;
|
|
pc += m_iMSS;
|
|
}
|
|
|
|
m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
|
|
|
|
pthread_mutex_init(&m_BufLock, NULL);
|
|
}
|
|
|
|
CSndBuffer::~CSndBuffer()
|
|
{
|
|
Block* pb = m_pBlock->m_pNext;
|
|
while (pb != m_pBlock)
|
|
{
|
|
Block* temp = pb;
|
|
pb = pb->m_pNext;
|
|
delete temp;
|
|
}
|
|
delete m_pBlock;
|
|
|
|
while (m_pBuffer != NULL)
|
|
{
|
|
Buffer* temp = m_pBuffer;
|
|
m_pBuffer = m_pBuffer->m_pNext;
|
|
delete [] temp->m_pcData;
|
|
delete temp;
|
|
}
|
|
|
|
pthread_mutex_destroy(&m_BufLock);
|
|
}
|
|
|
|
void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime, ref_t<int32_t> r_msgno)
|
|
{
|
|
int32_t& msgno = *r_msgno;
|
|
|
|
int size = len / m_iMSS;
|
|
if ((len % m_iMSS) != 0)
|
|
size ++;
|
|
|
|
HLOGC(mglog.Debug, log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes");
|
|
|
|
// dynamically increase sender buffer
|
|
while (size + m_iCount >= m_iSize)
|
|
{
|
|
HLOGC(mglog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
|
|
increase();
|
|
}
|
|
|
|
const uint64_t time = CTimer::getTime();
|
|
int32_t inorder = order ? MSGNO_PACKET_INORDER::mask : 0;
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "addBuffer: adding "
|
|
<< size << " packets (" << len << " bytes) to send, msgno=" << m_iNextMsgNo
|
|
<< (inorder ? "" : " NOT") << " in order");
|
|
|
|
Block* s = m_pLastBlock;
|
|
msgno = m_iNextMsgNo;
|
|
for (int i = 0; i < size; ++ i)
|
|
{
|
|
int pktlen = len - i * m_iMSS;
|
|
if (pktlen > m_iMSS)
|
|
pktlen = m_iMSS;
|
|
|
|
HLOGC(dlog.Debug, log << "addBuffer: spreading from=" << (i*m_iMSS) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
|
|
memcpy(s->m_pcData, data + i * m_iMSS, pktlen);
|
|
s->m_iLength = pktlen;
|
|
|
|
s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
|
|
if (i == 0)
|
|
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
|
|
if (i == size - 1)
|
|
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
|
|
// NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT.
|
|
// if i == 0 == size-1, it results with PB_SOLO.
|
|
// Packets assigned to one message can be:
|
|
// [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] - 4 packets per message
|
|
// [PB_FIRST] [PB_LAST] - 2 packets per message
|
|
// [PB_SOLO] - 1 packet per message
|
|
|
|
s->m_ullSourceTime_us = srctime;
|
|
s->m_ullOriginTime_us = time;
|
|
s->m_iTTL = ttl;
|
|
|
|
// XXX unchecked condition: s->m_pNext == NULL.
|
|
// Should never happen, as the call to increase() should ensure enough buffers.
|
|
SRT_ASSERT(s->m_pNext);
|
|
s = s->m_pNext;
|
|
}
|
|
m_pLastBlock = s;
|
|
|
|
CGuard::enterCS(m_BufLock);
|
|
m_iCount += size;
|
|
|
|
m_iBytesCount += len;
|
|
m_ullLastOriginTime_us = time;
|
|
|
|
updateInputRate(time, size, len);
|
|
|
|
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
|
|
updAvgBufSize(time);
|
|
#endif
|
|
|
|
CGuard::leaveCS(m_BufLock);
|
|
|
|
|
|
// MSGNO_SEQ::mask has a form: 00000011111111...
|
|
// At least it's known that it's from some index inside til the end (to bit 0).
|
|
// If this value has been reached in a step of incrementation, it means that the
|
|
// maximum value has been reached. Casting to int32_t to ensure the same sign
|
|
// in comparison, although it's far from reaching the sign bit.
|
|
|
|
m_iNextMsgNo ++;
|
|
if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
|
|
m_iNextMsgNo = 1;
|
|
}
|
|
|
|
void CSndBuffer::setInputRateSmpPeriod(int period)
|
|
{
|
|
m_InRatePeriod = (uint64_t)period; //(usec) 0=no input rate calculation
|
|
}
|
|
|
|
void CSndBuffer::updateInputRate(uint64_t time, int pkts, int bytes)
|
|
{
|
|
//no input rate calculation
|
|
if (m_InRatePeriod == 0)
|
|
return;
|
|
|
|
if (m_InRateStartTime == 0)
|
|
{
|
|
m_InRateStartTime = time;
|
|
return;
|
|
}
|
|
|
|
m_iInRatePktsCount += pkts;
|
|
m_iInRateBytesCount += bytes;
|
|
|
|
// Trigger early update in fast start mode
|
|
const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US)
|
|
&& (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);
|
|
|
|
const uint64_t period_us = (time - m_InRateStartTime);
|
|
if (early_update || period_us > m_InRatePeriod)
|
|
{
|
|
//Required Byte/sec rate (payload + headers)
|
|
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
|
|
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
|
|
HLOGC(dlog.Debug, log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
|
|
<< " rate=" << (m_iInRateBps*8)/1000
|
|
<< "kbps interval=" << period_us);
|
|
m_iInRatePktsCount = 0;
|
|
m_iInRateBytesCount = 0;
|
|
m_InRateStartTime = time;
|
|
|
|
setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
|
|
}
|
|
}
|
|
|
|
|
|
int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
|
|
{
|
|
int size = len / m_iMSS;
|
|
if ((len % m_iMSS) != 0)
|
|
size ++;
|
|
|
|
HLOGC(mglog.Debug, log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes");
|
|
|
|
// dynamically increase sender buffer
|
|
while (size + m_iCount >= m_iSize)
|
|
{
|
|
HLOGC(mglog.Debug, log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
|
|
increase();
|
|
}
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "addBufferFromFile: adding "
|
|
<< size << " packets (" << len << " bytes) to send, msgno=" << m_iNextMsgNo);
|
|
|
|
Block* s = m_pLastBlock;
|
|
int total = 0;
|
|
for (int i = 0; i < size; ++ i)
|
|
{
|
|
if (ifs.bad() || ifs.fail() || ifs.eof())
|
|
break;
|
|
|
|
int pktlen = len - i * m_iMSS;
|
|
if (pktlen > m_iMSS)
|
|
pktlen = m_iMSS;
|
|
|
|
HLOGC(dlog.Debug, log << "addBufferFromFile: reading from=" << (i*m_iMSS) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
|
|
ifs.read(s->m_pcData, pktlen);
|
|
if ((pktlen = int(ifs.gcount())) <= 0)
|
|
break;
|
|
|
|
// currently file transfer is only available in streaming mode, message is always in order, ttl = infinite
|
|
s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
|
|
if (i == 0)
|
|
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
|
|
if (i == size - 1)
|
|
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
|
|
// NOTE: PB_FIRST | PB_LAST == PB_SOLO.
|
|
// none of PB_FIRST & PB_LAST == PB_SUBSEQUENT.
|
|
|
|
s->m_iLength = pktlen;
|
|
s->m_iTTL = -1;
|
|
s = s->m_pNext;
|
|
|
|
total += pktlen;
|
|
}
|
|
m_pLastBlock = s;
|
|
|
|
CGuard::enterCS(m_BufLock);
|
|
m_iCount += size;
|
|
m_iBytesCount += total;
|
|
|
|
CGuard::leaveCS(m_BufLock);
|
|
|
|
m_iNextMsgNo ++;
|
|
if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
|
|
m_iNextMsgNo = 1;
|
|
|
|
return total;
|
|
}
|
|
|
|
int CSndBuffer::readData(char** data, int32_t& msgno_bitset, uint64_t& srctime, int kflgs)
|
|
{
|
|
// No data to read
|
|
if (m_pCurrBlock == m_pLastBlock)
|
|
return 0;
|
|
|
|
// Make the packet REFLECT the data stored in the buffer.
|
|
*data = m_pCurrBlock->m_pcData;
|
|
int readlen = m_pCurrBlock->m_iLength;
|
|
|
|
// XXX This is probably done because the encryption should happen
|
|
// just once, and so this sets the encryption flags to both msgno bitset
|
|
// IN THE PACKET and IN THE BLOCK. This is probably to make the encryption
|
|
// happen at the time when scheduling a new packet to send, but the packet
|
|
// must remain in the send buffer until it's ACKed. For the case of rexmit
|
|
// the packet will be taken "as is" (that is, already encrypted).
|
|
//
|
|
// The problem is in the order of things:
|
|
// 0. When the application stores the data, some of the flags for PH_MSGNO are set.
|
|
// 1. The readData() is called to get the original data sent by the application.
|
|
// 2. The data are original and must be encrypted. They WILL BE encrypted, later.
|
|
// 3. So far we are in readData() so the encryption flags must be updated NOW because
|
|
// later we won't have access to the block's data.
|
|
// 4. After exiting from readData(), the packet is being encrypted. It's immediately
|
|
// sent, however the data must remain in the sending buffer until they are ACKed.
|
|
// 5. In case when rexmission is needed, the second overloaded version of readData
|
|
// is being called, and the buffer + PH_MSGNO value is extracted. All interesting
|
|
// flags must be present and correct at that time.
|
|
//
|
|
// The only sensible way to fix this problem is to encrypt the packet not after
|
|
// extracting from here, but when the packet is stored into CSndBuffer. The appropriate
|
|
// flags for PH_MSGNO will be applied directly there. Then here the value for setting
|
|
// PH_MSGNO will be set as is.
|
|
|
|
if (kflgs == -1)
|
|
{
|
|
HLOGC(dlog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING.");
|
|
readlen = 0;
|
|
}
|
|
else
|
|
{
|
|
m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs);
|
|
}
|
|
msgno_bitset = m_pCurrBlock->m_iMsgNoBitset;
|
|
|
|
srctime =
|
|
m_pCurrBlock->m_ullSourceTime_us ? m_pCurrBlock->m_ullSourceTime_us :
|
|
m_pCurrBlock->m_ullOriginTime_us;
|
|
|
|
m_pCurrBlock = m_pCurrBlock->m_pNext;
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
|
|
|
|
return readlen;
|
|
}
|
|
|
|
int CSndBuffer::readData(char** data, const int offset, int32_t& msgno_bitset, uint64_t& srctime, int& msglen)
|
|
{
|
|
CGuard bufferguard(m_BufLock);
|
|
|
|
Block* p = m_pFirstBlock;
|
|
|
|
// XXX Suboptimal procedure to keep the blocks identifiable
|
|
// by sequence number. Consider using some circular buffer.
|
|
for (int i = 0; i < offset; ++ i)
|
|
p = p->m_pNext;
|
|
|
|
// Check if the block that is the next candidate to send (m_pCurrBlock pointing) is stale.
|
|
|
|
// If so, then inform the caller that it should first take care of the whole
|
|
// message (all blocks with that message id). Shift the m_pCurrBlock pointer
|
|
// to the position past the last of them. Then return -1 and set the
|
|
// msgno_bitset return reference to the message id that should be dropped as
|
|
// a whole.
|
|
|
|
// After taking care of that, the caller should immediately call this function again,
|
|
// this time possibly in order to find the real data to be sent.
|
|
|
|
// if found block is stale
|
|
// (This is for messages that have declared TTL - messages that fail to be sent
|
|
// before the TTL defined time comes, will be dropped).
|
|
if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_ullOriginTime_us) / 1000 > (uint64_t)p->m_iTTL))
|
|
{
|
|
int32_t msgno = p->getMsgSeq();
|
|
msglen = 1;
|
|
p = p->m_pNext;
|
|
bool move = false;
|
|
while (msgno == p->getMsgSeq())
|
|
{
|
|
if (p == m_pCurrBlock)
|
|
move = true;
|
|
p = p->m_pNext;
|
|
if (move)
|
|
m_pCurrBlock = p;
|
|
msglen ++;
|
|
}
|
|
|
|
HLOGC(dlog.Debug, log << "CSndBuffer::readData: due to TTL exceeded, " << msglen << " messages to drop, up to " << msgno);
|
|
|
|
// If readData returns -1, then msgno_bitset is understood as a Message ID to drop.
|
|
// This means that in this case it should be written by the message sequence value only
|
|
// (not the whole 4-byte bitset written at PH_MSGNO).
|
|
msgno_bitset = msgno;
|
|
return -1;
|
|
}
|
|
|
|
*data = p->m_pcData;
|
|
int readlen = p->m_iLength;
|
|
|
|
// XXX Here the value predicted to be applied to PH_MSGNO field is extracted.
|
|
// As this function is predicted to extract the data to send as a rexmited packet,
|
|
// the packet must be in the form ready to send - so, in case of encryption,
|
|
// encrypted, and with all ENC flags already set. So, the first call to send
|
|
// the packet originally (the other overload of this function) must set these
|
|
// flags.
|
|
msgno_bitset = p->m_iMsgNoBitset;
|
|
|
|
srctime =
|
|
p->m_ullSourceTime_us ? p->m_ullSourceTime_us :
|
|
p->m_ullOriginTime_us;
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send [REXMIT]");
|
|
|
|
return readlen;
|
|
}
|
|
|
|
void CSndBuffer::ackData(int offset)
|
|
{
|
|
CGuard bufferguard(m_BufLock);
|
|
|
|
bool move = false;
|
|
for (int i = 0; i < offset; ++ i)
|
|
{
|
|
m_iBytesCount -= m_pFirstBlock->m_iLength;
|
|
if (m_pFirstBlock == m_pCurrBlock)
|
|
move = true;
|
|
m_pFirstBlock = m_pFirstBlock->m_pNext;
|
|
}
|
|
if (move)
|
|
m_pCurrBlock = m_pFirstBlock;
|
|
|
|
m_iCount -= offset;
|
|
|
|
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
|
|
updAvgBufSize(CTimer::getTime());
|
|
#endif
|
|
|
|
CTimer::triggerEvent();
|
|
}
|
|
|
|
int CSndBuffer::getCurrBufSize() const
|
|
{
|
|
return m_iCount;
|
|
}
|
|
|
|
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
|
|
|
|
int CSndBuffer::getAvgBufSize(ref_t<int> r_bytes, ref_t<int> r_tsp)
|
|
{
|
|
int& bytes = *r_bytes;
|
|
int& timespan = *r_tsp;
|
|
CGuard bufferguard(m_BufLock); /* Consistency of pkts vs. bytes vs. spantime */
|
|
|
|
/* update stats in case there was no add/ack activity lately */
|
|
updAvgBufSize(CTimer::getTime());
|
|
|
|
bytes = m_iBytesCountMAvg;
|
|
timespan = m_TimespanMAvg;
|
|
return(m_iCountMAvg);
|
|
}
|
|
|
|
void CSndBuffer::updAvgBufSize(uint64_t now)
|
|
{
|
|
const uint64_t elapsed_ms = (now - m_LastSamplingTime) / 1000; //ms since last sampling
|
|
|
|
if ((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 > elapsed_ms)
|
|
return;
|
|
|
|
if (1000 < elapsed_ms)
|
|
{
|
|
/* No sampling in last 1 sec, initialize average */
|
|
m_iCountMAvg = getCurrBufSize(Ref(m_iBytesCountMAvg), Ref(m_TimespanMAvg));
|
|
m_LastSamplingTime = now;
|
|
}
|
|
else //((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 <= elapsed_ms)
|
|
{
|
|
/*
|
|
* weight last average value between -1 sec and last sampling time (LST)
|
|
* and new value between last sampling time and now
|
|
* |elapsed_ms|
|
|
* +----------------------------------+-------+
|
|
* -1 LST 0(now)
|
|
*/
|
|
int instspan;
|
|
int bytescount;
|
|
int count = getCurrBufSize(Ref(bytescount), Ref(instspan));
|
|
|
|
HLOGC(dlog.Debug, log << "updAvgBufSize: " << elapsed_ms
|
|
<< ": " << count << " " << bytescount
|
|
<< " " << instspan << "ms");
|
|
|
|
m_iCountMAvg = (int)(((count * (1000 - elapsed_ms)) + (count * elapsed_ms)) / 1000);
|
|
m_iBytesCountMAvg = (int)(((bytescount * (1000 - elapsed_ms)) + (bytescount * elapsed_ms)) / 1000);
|
|
m_TimespanMAvg = (int)(((instspan * (1000 - elapsed_ms)) + (instspan * elapsed_ms)) / 1000);
|
|
m_LastSamplingTime = now;
|
|
}
|
|
}
|
|
|
|
#endif /* SRT_ENABLE_SNDBUFSZ_MAVG */
|
|
|
|
int CSndBuffer::getCurrBufSize(ref_t<int> bytes, ref_t<int> timespan)
|
|
{
|
|
*bytes = m_iBytesCount;
|
|
/*
|
|
* Timespan can be less then 1000 us (1 ms) if few packets.
|
|
* Also, if there is only one pkt in buffer, the time difference will be 0.
|
|
* Therefore, always add 1 ms if not empty.
|
|
*/
|
|
*timespan = 0 < m_iCount ? int((m_ullLastOriginTime_us - m_pFirstBlock->m_ullOriginTime_us) / 1000) + 1 : 0;
|
|
|
|
return m_iCount;
|
|
}
|
|
|
|
int CSndBuffer::dropLateData(int &bytes, uint64_t latetime)
|
|
{
|
|
int dpkts = 0;
|
|
int dbytes = 0;
|
|
bool move = false;
|
|
|
|
CGuard bufferguard(m_BufLock);
|
|
for (int i = 0; i < m_iCount && m_pFirstBlock->m_ullOriginTime_us < latetime; ++ i)
|
|
{
|
|
dpkts++;
|
|
dbytes += m_pFirstBlock->m_iLength;
|
|
|
|
if (m_pFirstBlock == m_pCurrBlock) move = true;
|
|
m_pFirstBlock = m_pFirstBlock->m_pNext;
|
|
}
|
|
if (move) m_pCurrBlock = m_pFirstBlock;
|
|
m_iCount -= dpkts;
|
|
|
|
m_iBytesCount -= dbytes;
|
|
bytes = dbytes;
|
|
|
|
#ifdef SRT_ENABLE_SNDBUFSZ_MAVG
|
|
updAvgBufSize(CTimer::getTime());
|
|
#endif /* SRT_ENABLE_SNDBUFSZ_MAVG */
|
|
|
|
// CTimer::triggerEvent();
|
|
return(dpkts);
|
|
}
|
|
|
|
void CSndBuffer::increase()
|
|
{
|
|
int unitsize = m_pBuffer->m_iSize;
|
|
|
|
// new physical buffer
|
|
Buffer* nbuf = NULL;
|
|
try
|
|
{
|
|
nbuf = new Buffer;
|
|
nbuf->m_pcData = new char [unitsize * m_iMSS];
|
|
}
|
|
catch (...)
|
|
{
|
|
delete nbuf;
|
|
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
|
|
}
|
|
nbuf->m_iSize = unitsize;
|
|
nbuf->m_pNext = NULL;
|
|
|
|
// insert the buffer at the end of the buffer list
|
|
Buffer* p = m_pBuffer;
|
|
while (p->m_pNext != NULL)
|
|
p = p->m_pNext;
|
|
p->m_pNext = nbuf;
|
|
|
|
// new packet blocks
|
|
Block* nblk = NULL;
|
|
try
|
|
{
|
|
nblk = new Block;
|
|
}
|
|
catch (...)
|
|
{
|
|
delete nblk;
|
|
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
|
|
}
|
|
Block* pb = nblk;
|
|
for (int i = 1; i < unitsize; ++ i)
|
|
{
|
|
pb->m_pNext = new Block;
|
|
pb = pb->m_pNext;
|
|
}
|
|
|
|
// insert the new blocks onto the existing one
|
|
pb->m_pNext = m_pLastBlock->m_pNext;
|
|
m_pLastBlock->m_pNext = nblk;
|
|
|
|
pb = nblk;
|
|
char* pc = nbuf->m_pcData;
|
|
for (int i = 0; i < unitsize; ++ i)
|
|
{
|
|
pb->m_pcData = pc;
|
|
pb = pb->m_pNext;
|
|
pc += m_iMSS;
|
|
}
|
|
|
|
m_iSize += unitsize;
|
|
|
|
HLOGC(dlog.Debug, log << "CSndBuffer: BUFFER FULL - adding " << (unitsize*m_iMSS) << " bytes spread to " << unitsize << " blocks"
|
|
<< " (total size: " << m_iSize << " bytes)");
|
|
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
/*
|
|
* RcvBuffer (circular buffer):
|
|
*
|
|
* |<------------------- m_iSize ----------------------------->|
|
|
* | |<--- acked pkts -->|<--- m_iMaxPos --->| |
|
|
* | | | | |
|
|
* +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
|
|
* | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[]
|
|
* +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
|
|
* | | | |
|
|
* | | \__last pkt received
|
|
* | \___ m_iLastAckPos: last ack sent
|
|
* \___ m_iStartPos: first message to read
|
|
*
|
|
* m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped
|
|
*
|
|
* thread safety:
|
|
* m_iStartPos: CUDT::m_RecvLock
|
|
* m_iLastAckPos: CUDT::m_AckLock
|
|
* m_iMaxPos: none? (modified on add and ack
|
|
*/
|
|
|
|
|
|
// XXX Init values moved to in-class.
|
|
//const uint32_t CRcvBuffer::TSBPD_WRAP_PERIOD = (30*1000000); //30 seconds (in usec)
|
|
//const int CRcvBuffer::TSBPD_DRIFT_MAX_VALUE = 5000; // usec
|
|
//const int CRcvBuffer::TSBPD_DRIFT_MAX_SAMPLES = 1000; // ACK-ACK packets
|
|
#ifdef SRT_DEBUG_TSBPD_DRIFT
|
|
//const int CRcvBuffer::TSBPD_DRIFT_PRT_SAMPLES = 200; // ACK-ACK packets
|
|
#endif
|
|
|
|
CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts):
|
|
m_pUnit(NULL),
|
|
m_iSize(bufsize_pkts),
|
|
m_pUnitQueue(queue),
|
|
m_iStartPos(0),
|
|
m_iLastAckPos(0),
|
|
m_iMaxPos(0),
|
|
m_iNotch(0)
|
|
,m_BytesCountLock()
|
|
,m_iBytesCount(0)
|
|
,m_iAckedPktsCount(0)
|
|
,m_iAckedBytesCount(0)
|
|
,m_iAvgPayloadSz(7*188)
|
|
,m_bTsbPdMode(false)
|
|
,m_uTsbPdDelay(0)
|
|
,m_ullTsbPdTimeBase(0)
|
|
,m_bTsbPdWrapCheck(false)
|
|
//,m_iTsbPdDrift(0)
|
|
//,m_TsbPdDriftSum(0)
|
|
//,m_iTsbPdDriftNbSamples(0)
|
|
#ifdef SRT_ENABLE_RCVBUFSZ_MAVG
|
|
,m_LastSamplingTime(0)
|
|
,m_TimespanMAvg(0)
|
|
,m_iCountMAvg(0)
|
|
,m_iBytesCountMAvg(0)
|
|
#endif
|
|
{
|
|
m_pUnit = new CUnit* [m_iSize];
|
|
for (int i = 0; i < m_iSize; ++ i)
|
|
m_pUnit[i] = NULL;
|
|
|
|
#ifdef SRT_DEBUG_TSBPD_DRIFT
|
|
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
|
|
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
|
|
#endif
|
|
|
|
pthread_mutex_init(&m_BytesCountLock, NULL);
|
|
}
|
|
|
|
CRcvBuffer::~CRcvBuffer()
|
|
{
|
|
for (int i = 0; i < m_iSize; ++ i)
|
|
{
|
|
if (m_pUnit[i] != NULL)
|
|
{
|
|
m_pUnitQueue->makeUnitFree(m_pUnit[i]);
|
|
}
|
|
}
|
|
|
|
delete [] m_pUnit;
|
|
|
|
pthread_mutex_destroy(&m_BytesCountLock);
|
|
}
|
|
|
|
void CRcvBuffer::countBytes(int pkts, int bytes, bool acked)
|
|
{
|
|
/*
|
|
* Byte counter changes from both sides (Recv & Ack) of the buffer
|
|
* so the higher level lock is not enough for thread safe op.
|
|
*
|
|
* pkts are...
|
|
* added (bytes>0, acked=false),
|
|
* acked (bytes>0, acked=true),
|
|
* removed (bytes<0, acked=n/a)
|
|
*/
|
|
CGuard cg(m_BytesCountLock);
|
|
|
|
if (!acked) //adding new pkt in RcvBuffer
|
|
{
|
|
m_iBytesCount += bytes; /* added or removed bytes from rcv buffer */
|
|
if (bytes > 0) /* Assuming one pkt when adding bytes */
|
|
m_iAvgPayloadSz = ((m_iAvgPayloadSz * (100 - 1)) + bytes) / 100;
|
|
}
|
|
else // acking/removing pkts to/from buffer
|
|
{
|
|
m_iAckedPktsCount += pkts; /* acked or removed pkts from rcv buffer */
|
|
m_iAckedBytesCount += bytes; /* acked or removed bytes from rcv buffer */
|
|
|
|
if (bytes < 0) m_iBytesCount += bytes; /* removed bytes from rcv buffer */
|
|
}
|
|
}
|
|
|
|
int CRcvBuffer::addData(CUnit* unit, int offset)
|
|
{
|
|
SRT_ASSERT(unit != NULL);
|
|
if (offset >= getAvailBufSize())
|
|
return -1;
|
|
|
|
const int pos = (m_iLastAckPos + offset) % m_iSize;
|
|
if (offset >= m_iMaxPos)
|
|
m_iMaxPos = offset + 1;
|
|
|
|
if (m_pUnit[pos] != NULL) {
|
|
HLOGC(dlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo
|
|
<< " rejected, already exists");
|
|
return -1;
|
|
}
|
|
m_pUnit[pos] = unit;
|
|
countBytes(1, (int) unit->m_Packet.getLength());
|
|
|
|
m_pUnitQueue->makeUnitGood(unit);
|
|
|
|
HLOGC(dlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo
|
|
<< " accepted, off=" << offset << " POS=" << pos);
|
|
return 0;
|
|
}
|
|
|
|
int CRcvBuffer::readBuffer(char* data, int len)
|
|
{
|
|
int p = m_iStartPos;
|
|
int lastack = m_iLastAckPos;
|
|
int rs = len;
|
|
#if ENABLE_HEAVY_LOGGING
|
|
char* begin = data;
|
|
#endif
|
|
|
|
const uint64_t now = (m_bTsbPdMode ? CTimer::getTime() : uint64_t());
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
|
|
while ((p != lastack) && (rs > 0))
|
|
{
|
|
if (m_pUnit[p] == NULL)
|
|
{
|
|
LOGC(dlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
|
|
return -1;
|
|
}
|
|
|
|
if (m_bTsbPdMode)
|
|
{
|
|
HLOGC(dlog.Debug, log << CONID() << "readBuffer: chk if time2play: NOW=" << now << " PKT TS=" << getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()));
|
|
if ((getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()) > now))
|
|
break; /* too early for this unit, return whatever was copied */
|
|
}
|
|
|
|
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
|
|
if (unitsize > rs)
|
|
unitsize = rs;
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "readBuffer: copying buffer #" << p
|
|
<< " targetpos=" << int(data-begin) << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize-rs));
|
|
memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
|
|
data += unitsize;
|
|
|
|
if ((rs > unitsize) || (rs == int(m_pUnit[p]->m_Packet.getLength()) - m_iNotch))
|
|
{
|
|
CUnit* tmp = m_pUnit[p];
|
|
m_pUnit[p] = NULL;
|
|
m_pUnitQueue->makeUnitFree(tmp);
|
|
|
|
if (++ p == m_iSize)
|
|
p = 0;
|
|
|
|
m_iNotch = 0;
|
|
}
|
|
else
|
|
m_iNotch += rs;
|
|
|
|
rs -= unitsize;
|
|
}
|
|
|
|
/* we removed acked bytes form receive buffer */
|
|
countBytes(-1, -(len - rs), true);
|
|
m_iStartPos = p;
|
|
|
|
return len - rs;
|
|
}
|
|
|
|
int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
|
|
{
|
|
int p = m_iStartPos;
|
|
int lastack = m_iLastAckPos;
|
|
int rs = len;
|
|
|
|
while ((p != lastack) && (rs > 0))
|
|
{
|
|
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
|
|
if (unitsize > rs)
|
|
unitsize = rs;
|
|
|
|
ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
|
|
if (ofs.fail())
|
|
break;
|
|
|
|
if ((rs > unitsize) || (rs == int(m_pUnit[p]->m_Packet.getLength()) - m_iNotch))
|
|
{
|
|
CUnit* tmp = m_pUnit[p];
|
|
m_pUnit[p] = NULL;
|
|
m_pUnitQueue->makeUnitFree(tmp);
|
|
|
|
if (++ p == m_iSize)
|
|
p = 0;
|
|
|
|
m_iNotch = 0;
|
|
}
|
|
else
|
|
m_iNotch += rs;
|
|
|
|
rs -= unitsize;
|
|
}
|
|
|
|
/* we removed acked bytes form receive buffer */
|
|
countBytes(-1, -(len - rs), true);
|
|
m_iStartPos = p;
|
|
|
|
return len - rs;
|
|
}
|
|
|
|
void CRcvBuffer::ackData(int len)
|
|
{
|
|
SRT_ASSERT(len < m_iSize);
|
|
SRT_ASSERT(len > 0);
|
|
|
|
{
|
|
int pkts = 0;
|
|
int bytes = 0;
|
|
for (int i = m_iLastAckPos, n = (m_iLastAckPos + len) % m_iSize; i != n; i = (i + 1) % m_iSize)
|
|
{
|
|
if (m_pUnit[i] == NULL)
|
|
continue;
|
|
|
|
pkts++;
|
|
bytes += (int) m_pUnit[i]->m_Packet.getLength();
|
|
}
|
|
if (pkts > 0) countBytes(pkts, bytes, true);
|
|
}
|
|
m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
|
|
m_iMaxPos -= len;
|
|
if (m_iMaxPos < 0)
|
|
m_iMaxPos = 0;
|
|
|
|
CTimer::triggerEvent();
|
|
}
|
|
|
|
void CRcvBuffer::skipData(int len)
|
|
{
|
|
/*
|
|
* Caller need protect both AckLock and RecvLock
|
|
* to move both m_iStartPos and m_iLastAckPost
|
|
*/
|
|
if (m_iStartPos == m_iLastAckPos)
|
|
m_iStartPos = (m_iStartPos + len) % m_iSize;
|
|
m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
|
|
m_iMaxPos -= len;
|
|
if (m_iMaxPos < 0)
|
|
m_iMaxPos = 0;
|
|
}
|
|
|
|
bool CRcvBuffer::getRcvFirstMsg(ref_t<uint64_t> r_tsbpdtime, ref_t<bool> r_passack, ref_t<int32_t> r_skipseqno, ref_t<int32_t> r_curpktseq)
|
|
{
|
|
int32_t& skipseqno = *r_skipseqno;
|
|
bool& passack = *r_passack;
|
|
skipseqno = -1;
|
|
passack = false;
|
|
// tsbpdtime will be retrieved by the below call
|
|
// Returned values:
|
|
// - tsbpdtime: real time when the packet is ready to play (whether ready to play or not)
|
|
// - passack: false (the report concerns a packet with an exactly next sequence)
|
|
// - skipseqno == -1: no packets to skip towards the first RTP
|
|
// - ppkt: that exactly packet that is reported (for debugging purposes)
|
|
// - @return: whether the reported packet is ready to play
|
|
|
|
/* Check the acknowledged packets */
|
|
if (getRcvReadyMsg(r_tsbpdtime, r_curpktseq))
|
|
{
|
|
HLOGC(dlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << (*r_curpktseq));
|
|
return true;
|
|
}
|
|
else if (*r_tsbpdtime != 0)
|
|
{
|
|
HLOGC(dlog.Debug, log << "getRcvFirstMsg: no packets found");
|
|
return false;
|
|
}
|
|
|
|
// getRcvReadyMsg returned false and tsbpdtime == 0.
|
|
|
|
// Below this line we have only two options:
|
|
// - m_iMaxPos == 0, which means that no more packets are in the buffer
|
|
// - returned: tsbpdtime=0, passack=true, skipseqno=-1, ppkt=0, @return false
|
|
// - m_iMaxPos > 0, which means that there are packets arrived after a lost packet:
|
|
// - returned: tsbpdtime=PKT.TS, passack=true, skipseqno=PKT.SEQ, ppkt=PKT, @return LOCAL(PKT.TS) <= NOW
|
|
|
|
/*
|
|
* No acked packets ready but caller want to know next packet to wait for
|
|
* Check the not yet acked packets that may be stuck by missing packet(s).
|
|
*/
|
|
bool haslost = false;
|
|
*r_tsbpdtime = 0; // redundant, for clarity
|
|
passack = true;
|
|
|
|
// XXX SUSPECTED ISSUE with this algorithm:
|
|
// The above call to getRcvReadyMsg() should report as to whether:
|
|
// - there is an EXACTLY NEXT SEQUENCE packet
|
|
// - this packet is ready to play.
|
|
//
|
|
// Situations handled after the call are when:
|
|
// - there's the next sequence packet available and it is ready to play
|
|
// - there are no packets at all, ready to play or not
|
|
//
|
|
// So, the remaining situation is that THERE ARE PACKETS that follow
|
|
// the current sequence, but they are not ready to play. This includes
|
|
// packets that have the exactly next sequence and packets that jump
|
|
// over a lost packet.
|
|
//
|
|
// As the getRcvReadyMsg() function walks through the incoming units
|
|
// to see if there's anything that satisfies these conditions, it *SHOULD*
|
|
// be also capable of checking if the next available packet, if it is
|
|
// there, is the next sequence packet or not. Retrieving this exactly
|
|
// packet would be most useful, as the test for play-readiness and
|
|
// sequentiality can be done on it directly.
|
|
//
|
|
// When done so, the below loop would be completely unnecessary.
|
|
|
|
// Logical description of the below algorithm:
|
|
// 1. Check if the VERY FIRST PACKET is valid; if so then:
|
|
// - check if it's ready to play, return boolean value that marks it.
|
|
|
|
for (int i = m_iLastAckPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
|
|
{
|
|
if ( !m_pUnit[i]
|
|
|| m_pUnit[i]->m_iFlag != CUnit::GOOD )
|
|
{
|
|
/* There are packets in the sequence not received yet */
|
|
haslost = true;
|
|
HLOGC(dlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i);
|
|
}
|
|
else
|
|
{
|
|
/* We got the 1st valid packet */
|
|
*r_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
|
|
if (*r_tsbpdtime <= CTimer::getTime())
|
|
{
|
|
/* Packet ready to play */
|
|
if (haslost)
|
|
{
|
|
/*
|
|
* Packet stuck on non-acked side because of missing packets.
|
|
* Tell 1st valid packet seqno so caller can skip (drop) the missing packets.
|
|
*/
|
|
skipseqno = m_pUnit[i]->m_Packet.m_iSeqNo;
|
|
*r_curpktseq = skipseqno;
|
|
}
|
|
|
|
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found ready packet, nSKIPPED: "
|
|
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
|
|
|
|
// NOTE: if haslost is not set, it means that this is the VERY FIRST
|
|
// packet, that is, packet currently at pos = m_iLastAckPos. There's no
|
|
// possibility that it is so otherwise because:
|
|
// - if this first good packet is ready to play, THIS HERE RETURNS NOW.
|
|
// ...
|
|
return true;
|
|
}
|
|
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
|
|
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
|
|
// ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO,
|
|
// just states that there's no ready packet to play.
|
|
// ...
|
|
return false;
|
|
}
|
|
// ... and if this first packet WASN'T GOOD, the loop continues, however since now
|
|
// the 'haslost' is set, which means that it continues only to find the first valid
|
|
// packet after stating that the very first packet isn't valid.
|
|
}
|
|
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
|
|
return false;
|
|
}
|
|
|
|
bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpktseq)
|
|
{
|
|
*tsbpdtime = 0;
|
|
|
|
IF_HEAVY_LOGGING(const char* reason = "NOT RECEIVED");
|
|
|
|
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
|
|
{
|
|
bool freeunit = false;
|
|
|
|
/* Skip any invalid skipped/dropped packets */
|
|
if (m_pUnit[i] == NULL)
|
|
{
|
|
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
|
|
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
|
|
<< " SKIPPED - no unit there");
|
|
if (++ m_iStartPos == m_iSize)
|
|
m_iStartPos = 0;
|
|
continue;
|
|
}
|
|
|
|
*curpktseq = m_pUnit[i]->m_Packet.getSeqNo();
|
|
|
|
if (m_pUnit[i]->m_iFlag != CUnit::GOOD)
|
|
{
|
|
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
|
|
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
|
|
<< " SKIPPED - unit not good");
|
|
freeunit = true;
|
|
}
|
|
else
|
|
{
|
|
*tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
|
|
int64_t towait = (*tsbpdtime - CTimer::getTime());
|
|
if (towait > 0)
|
|
{
|
|
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
|
|
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
|
|
<< " pkt %" << curpktseq.get()
|
|
<< " NOT ready to play (only in " << (towait/1000.0) << "ms)");
|
|
return false;
|
|
}
|
|
|
|
if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
|
|
{
|
|
IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
|
|
freeunit = true; /* packet not decrypted */
|
|
}
|
|
else
|
|
{
|
|
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
|
|
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
|
|
<< " pkt %" << curpktseq.get()
|
|
<< " ready to play (delayed " << (-towait/1000.0) << "ms)");
|
|
return true;
|
|
}
|
|
}
|
|
|
|
if (freeunit)
|
|
{
|
|
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED");
|
|
/* removed skipped, dropped, undecryptable bytes from rcv buffer */
|
|
const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
|
|
countBytes(-1, -rmbytes, true);
|
|
|
|
CUnit* tmp = m_pUnit[i];
|
|
m_pUnit[i] = NULL;
|
|
m_pUnitQueue->makeUnitFree(tmp);
|
|
|
|
if (++m_iStartPos == m_iSize)
|
|
m_iStartPos = 0;
|
|
}
|
|
}
|
|
|
|
HLOGC(mglog.Debug, log << "getRcvReadyMsg: nothing to deliver: " << reason);
|
|
return false;
|
|
}
|
|
|
|
|
|
/*
|
|
* Return receivable data status (packet timestamp ready to play if TsbPd mode)
|
|
* Return playtime (tsbpdtime) of 1st packet in queue, ready to play or not
|
|
*
|
|
* Return data ready to be received (packet timestamp ready to play if TsbPd mode)
|
|
* Using getRcvDataSize() to know if there is something to read as it was widely
|
|
* used in the code (core.cpp) is expensive in TsbPD mode, hence this simpler function
|
|
* that only check if first packet in queue is ready.
|
|
*/
|
|
bool CRcvBuffer::isRcvDataReady(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpktseq)
|
|
{
|
|
*tsbpdtime = 0;
|
|
|
|
if (m_bTsbPdMode)
|
|
{
|
|
CPacket* pkt = getRcvReadyPacket();
|
|
if (!pkt)
|
|
return false;
|
|
|
|
/*
|
|
* Acknowledged data is available,
|
|
* Only say ready if time to deliver.
|
|
* Report the timestamp, ready or not.
|
|
*/
|
|
*curpktseq = pkt->getSeqNo();
|
|
*tsbpdtime = getPktTsbPdTime(pkt->getMsgTimeStamp());
|
|
|
|
return (*tsbpdtime <= CTimer::getTime());
|
|
}
|
|
|
|
return isRcvDataAvailable();
|
|
}
|
|
|
|
// XXX This function may be called only after checking
|
|
// if m_bTsbPdMode.
|
|
CPacket* CRcvBuffer::getRcvReadyPacket()
|
|
{
|
|
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
|
|
{
|
|
/*
|
|
* Skip missing packets that did not arrive in time.
|
|
*/
|
|
if ( m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD )
|
|
return &m_pUnit[i]->m_Packet;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
#if ENABLE_HEAVY_LOGGING
|
|
// This function is for debug purposes only and it's called only
|
|
// from within HLOG* macros.
|
|
void CRcvBuffer::reportBufferStats()
|
|
{
|
|
int nmissing = 0;
|
|
int32_t low_seq= -1, high_seq = -1;
|
|
int32_t low_ts = 0, high_ts = 0;
|
|
|
|
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
|
|
{
|
|
if ( m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD )
|
|
{
|
|
low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
|
|
low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp;
|
|
break;
|
|
}
|
|
++nmissing;
|
|
}
|
|
|
|
// Not sure if a packet MUST BE at the last ack pos position, so check, just in case.
|
|
int n = m_iLastAckPos;
|
|
if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
|
|
{
|
|
high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp;
|
|
high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
|
|
}
|
|
else
|
|
{
|
|
// Possibilities are:
|
|
// m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined.
|
|
// No packet: low_ts == 0, so high_ts == 0, too.
|
|
high_ts = low_ts;
|
|
}
|
|
// The 32-bit timestamps are relative and roll over oftten; what
|
|
// we really need is the timestamp difference. The only place where
|
|
// we can ask for the time base is the upper time because when trying
|
|
// to receive the time base for the lower time we'd break the requirement
|
|
// for monotonic clock.
|
|
|
|
uint64_t upper_time = high_ts;
|
|
uint64_t lower_time = low_ts;
|
|
|
|
if (lower_time > upper_time)
|
|
upper_time += uint64_t(CPacket::MAX_TIMESTAMP)+1;
|
|
|
|
int32_t timespan = upper_time - lower_time;
|
|
int seqspan = 0;
|
|
if (low_seq != -1 && high_seq != -1)
|
|
{
|
|
seqspan = CSeqNo::seqoff(low_seq, high_seq);
|
|
}
|
|
|
|
LOGC(dlog.Debug, log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing << "pkts");
|
|
LOGC(dlog.Debug, log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << FormatTime(lower_time) << " hi=" << FormatTime(upper_time) << ")");
|
|
}
|
|
|
|
#endif // ENABLE_HEAVY_LOGGING
|
|
|
|
bool CRcvBuffer::isRcvDataReady()
|
|
{
|
|
uint64_t tsbpdtime;
|
|
int32_t seq;
|
|
|
|
return isRcvDataReady(Ref(tsbpdtime), Ref(seq));
|
|
}
|
|
|
|
int CRcvBuffer::getAvailBufSize() const
|
|
{
|
|
// One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
|
|
return m_iSize - getRcvDataSize() - 1;
|
|
}
|
|
|
|
int CRcvBuffer::getRcvDataSize() const
|
|
{
|
|
if (m_iLastAckPos >= m_iStartPos)
|
|
return m_iLastAckPos - m_iStartPos;
|
|
|
|
return m_iSize + m_iLastAckPos - m_iStartPos;
|
|
}
|
|
|
|
int CRcvBuffer::debugGetSize() const
|
|
{
|
|
// Does exactly the same as getRcvDataSize, but
|
|
// it should be used FOR INFORMATIONAL PURPOSES ONLY.
|
|
// The source values might be changed in another thread
|
|
// during the calculation, although worst case the
|
|
// resulting value may differ to the real buffer size by 1.
|
|
int from = m_iStartPos, to = m_iLastAckPos;
|
|
int size = to - from;
|
|
if (size < 0)
|
|
size += m_iSize;
|
|
|
|
return size;
|
|
}
|
|
|
|
|
|
bool CRcvBuffer::empty() const
|
|
{
|
|
// This will not always return the intended value,
|
|
// that is, it may return false when the buffer really is
|
|
// empty - but it will return true then in one of next calls.
|
|
// This function will be always called again at some point
|
|
// if it returned false, and on true the connection
|
|
// is going to be broken - so this behavior is acceptable.
|
|
return m_iStartPos == m_iLastAckPos;
|
|
}
|
|
|
|
|
|
#ifdef SRT_ENABLE_RCVBUFSZ_MAVG
|
|
/* Return moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
|
|
int CRcvBuffer::getRcvAvgDataSize(int &bytes, int ×pan)
|
|
{
|
|
timespan = m_TimespanMAvg;
|
|
bytes = m_iBytesCountMAvg;
|
|
return(m_iCountMAvg);
|
|
}
|
|
|
|
/* Update moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */
|
|
void CRcvBuffer::updRcvAvgDataSize(uint64_t now)
|
|
{
|
|
const uint64_t elapsed_ms = (now - m_LastSamplingTime) / 1000; //ms since last sampling
|
|
|
|
if ((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 > elapsed_ms)
|
|
return; /* Last sampling too recent, skip */
|
|
|
|
if (1000 < elapsed_ms)
|
|
{
|
|
/* No sampling in last 1 sec, initialize/reset moving average */
|
|
m_iCountMAvg = getRcvDataSize(m_iBytesCountMAvg, m_TimespanMAvg);
|
|
m_LastSamplingTime = now;
|
|
|
|
HLOGC(dlog.Debug, log << "getRcvDataSize: " << m_iCountMAvg << " " << m_iBytesCountMAvg
|
|
<< " " << m_TimespanMAvg << " ms elapsed_ms: " << elapsed_ms << " ms");
|
|
}
|
|
else if ((1000000 / SRT_MAVG_SAMPLING_RATE) / 1000 <= elapsed_ms)
|
|
{
|
|
/*
|
|
* Weight last average value between -1 sec and last sampling time (LST)
|
|
* and new value between last sampling time and now
|
|
* |elapsed_ms|
|
|
* +----------------------------------+-------+
|
|
* -1 LST 0(now)
|
|
*/
|
|
int instspan;
|
|
int bytescount;
|
|
int count = getRcvDataSize(bytescount, instspan);
|
|
|
|
m_iCountMAvg = (int)(((count * (1000 - elapsed_ms)) + (count * elapsed_ms)) / 1000);
|
|
m_iBytesCountMAvg = (int)(((bytescount * (1000 - elapsed_ms)) + (bytescount * elapsed_ms)) / 1000);
|
|
m_TimespanMAvg = (int)(((instspan * (1000 - elapsed_ms)) + (instspan * elapsed_ms)) / 1000);
|
|
m_LastSamplingTime = now;
|
|
|
|
HLOGC(dlog.Debug, log << "getRcvDataSize: " << count << " " << bytescount << " " << instspan
|
|
<< " ms elapsed_ms: " << elapsed_ms << " ms");
|
|
}
|
|
}
|
|
#endif /* SRT_ENABLE_RCVBUFSZ_MAVG */
|
|
|
|
/* Return acked data pkts, bytes, and timespan (ms) of the receive buffer */
|
|
int CRcvBuffer::getRcvDataSize(int &bytes, int ×pan)
|
|
{
|
|
timespan = 0;
|
|
if (m_bTsbPdMode)
|
|
{
|
|
// Get a valid startpos.
|
|
// Skip invalid entries in the beginning, if any.
|
|
int startpos = m_iStartPos;
|
|
for (; startpos != m_iLastAckPos; startpos = (startpos + 1) % m_iSize)
|
|
{
|
|
if ((NULL != m_pUnit[startpos]) && (CUnit::GOOD == m_pUnit[startpos]->m_iFlag))
|
|
break;
|
|
}
|
|
|
|
int endpos = m_iLastAckPos;
|
|
|
|
if (m_iLastAckPos != startpos)
|
|
{
|
|
/*
|
|
* |<--- DataSpan ---->|<- m_iMaxPos ->|
|
|
* +---+---+---+---+---+---+---+---+---+---+---+---
|
|
* | | 1 | 1 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | | m_pUnits[]
|
|
* +---+---+---+---+---+---+---+---+---+---+---+---
|
|
* | |
|
|
* \_ m_iStartPos \_ m_iLastAckPos
|
|
*
|
|
* m_pUnits[startpos] shall be valid (->m_iFlag==CUnit::GOOD).
|
|
* If m_pUnits[m_iLastAckPos-1] is not valid (NULL or ->m_iFlag!=CUnit::GOOD),
|
|
* it means m_pUnits[m_iLastAckPos] is valid since a valid unit is needed to skip.
|
|
* Favor m_pUnits[m_iLastAckPos] if valid over [m_iLastAckPos-1] to include the whole acked interval.
|
|
*/
|
|
if ((m_iMaxPos <= 0)
|
|
|| (!m_pUnit[m_iLastAckPos])
|
|
|| (m_pUnit[m_iLastAckPos]->m_iFlag != CUnit::GOOD))
|
|
{
|
|
endpos = (m_iLastAckPos == 0 ? m_iSize - 1 : m_iLastAckPos - 1);
|
|
}
|
|
|
|
if ((NULL != m_pUnit[endpos]) && (NULL != m_pUnit[startpos]))
|
|
{
|
|
const uint64_t startstamp = getPktTsbPdTime(m_pUnit[startpos]->m_Packet.getMsgTimeStamp());
|
|
const uint64_t endstamp = getPktTsbPdTime(m_pUnit[endpos]->m_Packet.getMsgTimeStamp());
|
|
/*
|
|
* There are sampling conditions where spantime is < 0 (big unsigned value).
|
|
* It has been observed after changing the SRT latency from 450 to 200 on the sender.
|
|
*
|
|
* Possible packet order corruption when dropping packet,
|
|
* cause by bad thread protection when adding packet in queue
|
|
* was later discovered and fixed. Security below kept.
|
|
*
|
|
* DateTime RecvRate LostRate DropRate AvailBw RTT RecvBufs PdDelay
|
|
* 2014-12-08T15:04:25-0500 4712 110 0 96509 33.710 393 450
|
|
* 2014-12-08T15:04:35-0500 4512 95 0 107771 33.493 1496542976 200
|
|
* 2014-12-08T15:04:40-0500 4213 106 3 107352 53.657 9499425 200
|
|
* 2014-12-08T15:04:45-0500 4575 104 0 102194 53.614 59666 200
|
|
* 2014-12-08T15:04:50-0500 4475 124 0 100543 53.526 505 200
|
|
*/
|
|
if (endstamp > startstamp)
|
|
timespan = (int)((endstamp - startstamp) / 1000);
|
|
}
|
|
/*
|
|
* Timespan can be less then 1000 us (1 ms) if few packets.
|
|
* Also, if there is only one pkt in buffer, the time difference will be 0.
|
|
* Therefore, always add 1 ms if not empty.
|
|
*/
|
|
if (0 < m_iAckedPktsCount)
|
|
timespan += 1;
|
|
}
|
|
}
|
|
HLOGF(dlog.Debug, "getRcvDataSize: %6d %6d %6d ms\n", m_iAckedPktsCount, m_iAckedBytesCount, timespan);
|
|
bytes = m_iAckedBytesCount;
|
|
return m_iAckedPktsCount;
|
|
}
|
|
|
|
int CRcvBuffer::getRcvAvgPayloadSize() const
|
|
{
|
|
return m_iAvgPayloadSz;
|
|
}
|
|
|
|
void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
|
|
{
|
|
for (int i = m_iStartPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
|
|
if ((m_pUnit[i] != NULL)
|
|
&& (m_pUnit[i]->m_Packet.getMsgSeq(using_rexmit_flag) == msgno))
|
|
m_pUnit[i]->m_iFlag = CUnit::DROPPED;
|
|
}
|
|
|
|
uint64_t CRcvBuffer::getTsbPdTimeBase(uint32_t timestamp_us)
|
|
{
|
|
/*
|
|
* Packet timestamps wrap around every 01h11m35s (32-bit in usec)
|
|
* When added to the peer start time (base time),
|
|
* wrapped around timestamps don't provide a valid local packet delevery time.
|
|
*
|
|
* A wrap check period starts 30 seconds before the wrap point.
|
|
* In this period, timestamps smaller than 30 seconds are considered to have wrapped around (then adjusted).
|
|
* The wrap check period ends 30 seconds after the wrap point, afterwhich time base has been adjusted.
|
|
*/
|
|
uint64_t carryover = 0;
|
|
|
|
// This function should generally return the timebase for the given timestamp_us.
|
|
// It's assumed that the timestamp_us, for which this function is being called,
|
|
// is received as monotonic clock. This function then traces the changes in the
|
|
// timestamps passed as argument and catches the moment when the 64-bit timebase
|
|
// should be increased by a "segment length" (MAX_TIMESTAMP+1).
|
|
|
|
// The checks will be provided for the following split:
|
|
// [INITIAL30][FOLLOWING30]....[LAST30] <-- == CPacket::MAX_TIMESTAMP
|
|
//
|
|
// The following actions should be taken:
|
|
// 1. Check if this is [LAST30]. If so, ENTER TSBPD-wrap-check state
|
|
// 2. Then, it should turn into [INITIAL30] at some point. If so, use carryover MAX+1.
|
|
// 3. Then it should switch to [FOLLOWING30]. If this is detected,
|
|
// - EXIT TSBPD-wrap-check state
|
|
// - save the carryover as the current time base.
|
|
|
|
if (m_bTsbPdWrapCheck)
|
|
{
|
|
// Wrap check period.
|
|
|
|
if (timestamp_us < TSBPD_WRAP_PERIOD)
|
|
{
|
|
carryover = uint64_t(CPacket::MAX_TIMESTAMP) + 1;
|
|
}
|
|
//
|
|
else if ((timestamp_us >= TSBPD_WRAP_PERIOD)
|
|
&& (timestamp_us <= (TSBPD_WRAP_PERIOD * 2)))
|
|
{
|
|
/* Exiting wrap check period (if for packet delivery head) */
|
|
m_bTsbPdWrapCheck = false;
|
|
m_ullTsbPdTimeBase += uint64_t(CPacket::MAX_TIMESTAMP) + 1;
|
|
tslog.Debug("tsbpd wrap period ends");
|
|
}
|
|
}
|
|
// Check if timestamp_us is in the last 30 seconds before reaching the MAX_TIMESTAMP.
|
|
else if (timestamp_us > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD))
|
|
{
|
|
/* Approching wrap around point, start wrap check period (if for packet delivery head) */
|
|
m_bTsbPdWrapCheck = true;
|
|
tslog.Debug("tsbpd wrap period begins");
|
|
}
|
|
|
|
return (m_ullTsbPdTimeBase + carryover);
|
|
}
|
|
|
|
uint64_t CRcvBuffer::getPktTsbPdTime(uint32_t timestamp)
|
|
{
|
|
return(getTsbPdTimeBase(timestamp) + m_uTsbPdDelay + timestamp + m_DriftTracer.drift());
|
|
}
|
|
|
|
int CRcvBuffer::setRcvTsbPdMode(uint64_t timebase, uint32_t delay)
|
|
{
|
|
m_bTsbPdMode = true;
|
|
m_bTsbPdWrapCheck = false;
|
|
|
|
// Timebase passed here comes is calculated as:
|
|
// >>> CTimer::getTime() - ctrlpkt->m_iTimeStamp
|
|
// where ctrlpkt is the packet with SRT_CMD_HSREQ message.
|
|
//
|
|
// This function is called in the HSREQ reception handler only.
|
|
m_ullTsbPdTimeBase = timebase;
|
|
// XXX Seems like this may not work correctly.
|
|
// At least this solution this way won't work with application-supplied
|
|
// timestamps. For that case the timestamps should be taken exclusively
|
|
// from the data packets because in case of application-supplied timestamps
|
|
// they come from completely different server and undergo different rules
|
|
// of network latency and drift.
|
|
m_uTsbPdDelay = delay;
|
|
return 0;
|
|
}
|
|
|
|
#ifdef SRT_DEBUG_TSBPD_DRIFT
|
|
void CRcvBuffer::printDriftHistogram(int64_t iDrift)
|
|
{
|
|
/*
|
|
* Build histogram of drift values
|
|
* First line (ms): <=-10.0 -9.0 ... -1.0 - 0.0 + 1.0 ... 9.0 >=10.0
|
|
* Second line (ms): -0.9 ... -0.1 - 0.0 + 0.1 ... 0.9
|
|
* 0 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 1 0 0 0 0 0 0
|
|
* 0 0 0 0 0 0 0 0 0 - 0 + 0 0 0 0 0 0 0 0 0
|
|
*/
|
|
iDrift /= 100; // uSec to 100 uSec (0.1ms)
|
|
if (-10 < iDrift && iDrift < 10)
|
|
{
|
|
/* Fill 100us histogram -900 .. 900 us 100 us increments */
|
|
m_TsbPdDriftHisto100us[10 + iDrift]++;
|
|
}
|
|
else
|
|
{
|
|
/* Fill 1ms histogram <=-10.0, -9.0 .. 9.0, >=10.0 ms in 1 ms increments */
|
|
iDrift /= 10; // 100uSec to 1ms
|
|
if (-10 < iDrift && iDrift < 10) m_TsbPdDriftHisto1ms[10 + iDrift]++;
|
|
else if (iDrift <= -10) m_TsbPdDriftHisto1ms[0]++;
|
|
else m_TsbPdDriftHisto1ms[20]++;
|
|
}
|
|
|
|
if ((m_iTsbPdDriftNbSamples % TSBPD_DRIFT_PRT_SAMPLES) == 0)
|
|
{
|
|
int *histo = m_TsbPdDriftHisto1ms;
|
|
|
|
fprintf(stderr, "%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
|
|
histo[0],histo[1],histo[2],histo[3],histo[4],
|
|
histo[5],histo[6],histo[7],histo[8],histo[9],histo[10]);
|
|
fprintf(stderr, "%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
|
|
histo[11],histo[12],histo[13],histo[14],histo[15],
|
|
histo[16],histo[17],histo[18],histo[19],histo[20]);
|
|
|
|
histo = m_TsbPdDriftHisto100us;
|
|
fprintf(stderr, " %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
|
|
histo[1],histo[2],histo[3],histo[4],histo[5],
|
|
histo[6],histo[7],histo[8],histo[9],histo[10]);
|
|
fprintf(stderr, "%4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
|
|
histo[11],histo[12],histo[13],histo[14],histo[15],
|
|
histo[16],histo[17],histo[18],histo[19]);
|
|
}
|
|
}
|
|
|
|
void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg)
|
|
{
|
|
char szTime[32] = {};
|
|
uint64_t now = CTimer::getTime();
|
|
time_t tnow = (time_t)(now/1000000);
|
|
strftime(szTime, sizeof(szTime), "%H:%M:%S", localtime(&tnow));
|
|
fprintf(stderr, "%s.%03d: tsbpd offset=%d drift=%d usec\n",
|
|
szTime, (int)((now%1000000)/1000), tsbPdOffset, tsbPdDriftAvg);
|
|
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
|
|
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
|
|
}
|
|
#endif /* SRT_DEBUG_TSBPD_DRIFT */
|
|
|
|
void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock)
|
|
{
|
|
if (!m_bTsbPdMode) // Not checked unless in TSBPD mode
|
|
return;
|
|
/*
|
|
* TsbPD time drift correction
|
|
* TsbPD time slowly drift over long period depleting decoder buffer or raising latency
|
|
* Re-evaluate the time adjustment value using a receiver control packet (ACK-ACK).
|
|
* ACK-ACK timestamp is RTT/2 ago (in sender's time base)
|
|
* Data packet have origin time stamp which is older when retransmitted so not suitable for this.
|
|
*
|
|
* Every TSBPD_DRIFT_MAX_SAMPLES packets, the average drift is calculated
|
|
* if -TSBPD_DRIFT_MAX_VALUE < avgTsbPdDrift < TSBPD_DRIFT_MAX_VALUE uSec, pass drift value to RcvBuffer to adjust delevery time.
|
|
* if outside this range, adjust this->TsbPdTimeOffset and RcvBuffer->TsbPdTimeBase by +-TSBPD_DRIFT_MAX_VALUE uSec
|
|
* to maintain TsbPdDrift values in reasonable range (-5ms .. +5ms).
|
|
*/
|
|
|
|
// Note important thing: this function is being called _EXCLUSIVELY_ in the handler
|
|
// of UMSG_ACKACK command reception. This means that the timestamp used here comes
|
|
// from the CONTROL domain, not DATA domain (timestamps from DATA domain may be
|
|
// either schedule time or a time supplied by the application).
|
|
|
|
int64_t iDrift = CTimer::getTime() - (getTsbPdTimeBase(timestamp) + timestamp);
|
|
|
|
CGuard::enterCS(mutex_to_lock);
|
|
|
|
bool updated = m_DriftTracer.update(iDrift);
|
|
|
|
#ifdef SRT_DEBUG_TSBPD_DRIFT
|
|
printDriftHistogram(iDrift);
|
|
#endif /* SRT_DEBUG_TSBPD_DRIFT */
|
|
|
|
if ( updated )
|
|
{
|
|
#ifdef SRT_DEBUG_TSBPD_DRIFT
|
|
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
|
|
#endif /* SRT_DEBUG_TSBPD_DRIFT */
|
|
|
|
#if ENABLE_HEAVY_LOGGING
|
|
uint64_t oldbase = m_ullTsbPdTimeBase;
|
|
#endif
|
|
m_ullTsbPdTimeBase += m_DriftTracer.overdrift();
|
|
|
|
HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms AVG="
|
|
<< (m_DriftTracer.drift()/1000.0) << "ms, TB: "
|
|
<< FormatTime(oldbase) << " UPDATED TO: " << FormatTime(m_ullTsbPdTimeBase));
|
|
}
|
|
else
|
|
{
|
|
HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms TB REMAINS: " << FormatTime(m_ullTsbPdTimeBase));
|
|
}
|
|
|
|
CGuard::leaveCS(mutex_to_lock);
|
|
}
|
|
|
|
int CRcvBuffer::readMsg(char* data, int len)
|
|
{
|
|
SRT_MSGCTRL dummy = srt_msgctrl_default;
|
|
return readMsg(data, len, Ref(dummy));
|
|
}
|
|
|
|
|
|
int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
|
|
{
|
|
SRT_MSGCTRL& msgctl = *r_msgctl;
|
|
int p, q;
|
|
bool passack;
|
|
bool empty = true;
|
|
uint64_t& rplaytime = msgctl.srctime;
|
|
|
|
#ifdef ENABLE_HEAVY_LOGGING
|
|
reportBufferStats();
|
|
#endif
|
|
|
|
if (m_bTsbPdMode)
|
|
{
|
|
passack = false;
|
|
int seq = 0;
|
|
|
|
if (getRcvReadyMsg(Ref(rplaytime), Ref(seq)))
|
|
{
|
|
empty = false;
|
|
|
|
// In TSBPD mode you always read one message
|
|
// at a time and a message always fits in one UDP packet,
|
|
// so in one "unit".
|
|
p = q = m_iStartPos;
|
|
|
|
#ifdef SRT_DEBUG_TSBPD_OUTJITTER
|
|
uint64_t now = CTimer::getTime();
|
|
if ((now - rplaytime)/10 < 10)
|
|
m_ulPdHisto[0][(now - rplaytime)/10]++;
|
|
else if ((now - rplaytime)/100 < 10)
|
|
m_ulPdHisto[1][(now - rplaytime)/100]++;
|
|
else if ((now - rplaytime)/1000 < 10)
|
|
m_ulPdHisto[2][(now - rplaytime)/1000]++;
|
|
else
|
|
m_ulPdHisto[3][1]++;
|
|
#endif /* SRT_DEBUG_TSBPD_OUTJITTER */
|
|
}
|
|
}
|
|
else
|
|
{
|
|
rplaytime = 0;
|
|
if (scanMsg(Ref(p), Ref(q), Ref(passack)))
|
|
empty = false;
|
|
|
|
}
|
|
|
|
if (empty)
|
|
return 0;
|
|
|
|
// This should happen just once. By 'empty' condition
|
|
// we have a guarantee that m_pUnit[p] exists and is valid.
|
|
CPacket& pkt1 = m_pUnit[p]->m_Packet;
|
|
|
|
// This returns the sequence number and message number to
|
|
// the API caller.
|
|
msgctl.pktseq = pkt1.getSeqNo();
|
|
msgctl.msgno = pkt1.getMsgSeq();
|
|
|
|
SRT_ASSERT(len > 0);
|
|
int rs = len > 0 ? len : 0;
|
|
while (p != (q + 1) % m_iSize)
|
|
{
|
|
const int pktlen = (int)m_pUnit[p]->m_Packet.getLength();
|
|
// When unitsize is less than pktlen, only a fragment is copied to the output 'data',
|
|
// but still the whole packet is removed from the receiver buffer.
|
|
if (pktlen > 0)
|
|
countBytes(-1, -pktlen, true);
|
|
|
|
const int unitsize = ((rs >= 0) && (pktlen > rs)) ? rs : pktlen;
|
|
|
|
HLOGC(mglog.Debug, log << "readMsg: checking unit POS=" << p);
|
|
|
|
if (unitsize > 0)
|
|
{
|
|
memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
|
|
data += unitsize;
|
|
rs -= unitsize;
|
|
|
|
#if ENABLE_HEAVY_LOGGING
|
|
{
|
|
static uint64_t prev_now;
|
|
static uint64_t prev_srctime;
|
|
CPacket& pkt = m_pUnit[p]->m_Packet;
|
|
|
|
int32_t seq = pkt.m_iSeqNo;
|
|
|
|
uint64_t nowtime = CTimer::getTime();
|
|
//CTimer::rdtsc(nowtime);
|
|
uint64_t srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp());
|
|
|
|
int64_t timediff = nowtime - srctime;
|
|
int64_t nowdiff = prev_now ? (nowtime - prev_now) : 0;
|
|
uint64_t srctimediff = prev_srctime ? (srctime - prev_srctime) : 0;
|
|
|
|
HLOGC(dlog.Debug, log << CONID() << "readMsg: DELIVERED seq=" << seq
|
|
<< " from POS=" << p << " T="
|
|
<< FormatTime(srctime) << " in " << (timediff/1000.0)
|
|
<< "ms - TIME-PREVIOUS: PKT: " << (srctimediff/1000.0)
|
|
<< " LOCAL: " << (nowdiff/1000.0)
|
|
<< " !" << BufferStamp(pkt.data(), pkt.size()));
|
|
|
|
prev_now = nowtime;
|
|
prev_srctime = srctime;
|
|
}
|
|
#endif
|
|
}
|
|
else
|
|
{
|
|
HLOGC(dlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT");
|
|
}
|
|
|
|
if (!passack)
|
|
{
|
|
HLOGC(dlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p);
|
|
CUnit* tmp = m_pUnit[p];
|
|
m_pUnit[p] = NULL;
|
|
m_pUnitQueue->makeUnitFree(tmp);
|
|
}
|
|
else
|
|
{
|
|
HLOGC(dlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p);
|
|
m_pUnit[p]->m_iFlag = CUnit::PASSACK;
|
|
}
|
|
|
|
if (++ p == m_iSize)
|
|
p = 0;
|
|
}
|
|
|
|
if (!passack)
|
|
m_iStartPos = (q + 1) % m_iSize;
|
|
|
|
return len - rs;
|
|
}
|
|
|
|
|
|
bool CRcvBuffer::scanMsg(ref_t<int> r_p, ref_t<int> r_q, ref_t<bool> passack)
|
|
{
|
|
int& p = *r_p;
|
|
int& q = *r_q;
|
|
|
|
// empty buffer
|
|
if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
|
|
{
|
|
HLOGC(mglog.Debug, log << "scanMsg: empty buffer");
|
|
return false;
|
|
}
|
|
|
|
int rmpkts = 0;
|
|
int rmbytes = 0;
|
|
//skip all bad msgs at the beginning
|
|
while (m_iStartPos != m_iLastAckPos)
|
|
{
|
|
// Roll up to the first valid unit
|
|
if (!m_pUnit[m_iStartPos])
|
|
{
|
|
if (++ m_iStartPos == m_iSize)
|
|
m_iStartPos = 0;
|
|
continue;
|
|
}
|
|
|
|
// Note: PB_FIRST | PB_LAST == PB_SOLO.
|
|
// testing if boundary() & PB_FIRST tests if the msg is first OR solo.
|
|
if ( m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD
|
|
&& m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST )
|
|
{
|
|
bool good = true;
|
|
|
|
// look ahead for the whole message
|
|
|
|
// We expect to see either of:
|
|
// [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST]
|
|
// [PB_SOLO]
|
|
// but not:
|
|
// [PB_FIRST] NULL ...
|
|
// [PB_FIRST] FREE/PASSACK/DROPPED...
|
|
// If the message didn't look as expected, interrupt this.
|
|
|
|
// This begins with a message starting at m_iStartPos
|
|
// up to m_iLastAckPos OR until the PB_LAST message is found.
|
|
// If any of the units on this way isn't good, this OUTER loop
|
|
// will be interrupted.
|
|
for (int i = m_iStartPos; i != m_iLastAckPos;)
|
|
{
|
|
if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
|
|
{
|
|
good = false;
|
|
break;
|
|
}
|
|
|
|
// Likewise, boundary() & PB_LAST will be satisfied for last OR solo.
|
|
if ( m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST )
|
|
break;
|
|
|
|
if (++ i == m_iSize)
|
|
i = 0;
|
|
}
|
|
|
|
if (good)
|
|
break;
|
|
}
|
|
|
|
CUnit* tmp = m_pUnit[m_iStartPos];
|
|
m_pUnit[m_iStartPos] = NULL;
|
|
rmpkts++;
|
|
rmbytes += (int) tmp->m_Packet.getLength();
|
|
m_pUnitQueue->makeUnitFree(tmp);
|
|
|
|
if (++ m_iStartPos == m_iSize)
|
|
m_iStartPos = 0;
|
|
}
|
|
/* we removed bytes form receive buffer */
|
|
countBytes(-rmpkts, -rmbytes, true);
|
|
|
|
// Not sure if this is correct, but this above 'while' loop exits
|
|
// under the following conditions only:
|
|
// - m_iStartPos == m_iLastAckPos (that makes passack = true)
|
|
// - found at least GOOD unit with PB_FIRST and not all messages up to PB_LAST are good,
|
|
// in which case it returns with m_iStartPos <% m_iLastAckPos (earlier)
|
|
// Also all units that lied before m_iStartPos are removed.
|
|
|
|
p = -1; // message head
|
|
q = m_iStartPos; // message tail
|
|
*passack = m_iStartPos == m_iLastAckPos;
|
|
bool found = false;
|
|
|
|
// looking for the first message
|
|
//>>m_pUnit[size + m_iMaxPos] is not valid
|
|
|
|
// XXX Would be nice to make some very thorough refactoring here.
|
|
|
|
// This rolls by q variable from m_iStartPos up to m_iLastAckPos,
|
|
// actually from the first message up to the one with PB_LAST
|
|
// or PB_SOLO boundary.
|
|
|
|
// The 'i' variable used in this loop is just a stub, and the
|
|
// upper value is just to make it "virtually infinite, but with
|
|
// no exaggeration" (actually it makes sure that this loop does
|
|
// not roll more than around the whole cyclic container). This variable
|
|
// isn't used inside the loop at all.
|
|
|
|
for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++ i)
|
|
{
|
|
if (m_pUnit[q] && m_pUnit[q]->m_iFlag == CUnit::GOOD)
|
|
{
|
|
// Equivalent pseudocode:
|
|
// PacketBoundary bound = m_pUnit[q]->m_Packet.getMsgBoundary();
|
|
// if ( IsSet(bound, PB_FIRST) )
|
|
// p = q;
|
|
// if ( IsSet(bound, PB_LAST) && p != -1 )
|
|
// found = true;
|
|
//
|
|
// Not implemented this way because it uselessly check p for -1
|
|
// also after setting it explicitly.
|
|
|
|
switch (m_pUnit[q]->m_Packet.getMsgBoundary())
|
|
{
|
|
case PB_SOLO: // 11
|
|
p = q;
|
|
found = true;
|
|
break;
|
|
|
|
case PB_FIRST: // 10
|
|
p = q;
|
|
break;
|
|
|
|
case PB_LAST: // 01
|
|
if (p != -1)
|
|
found = true;
|
|
break;
|
|
|
|
case PB_SUBSEQUENT:
|
|
; // do nothing (caught first, rolling for last)
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// a hole in this message, not valid, restart search
|
|
p = -1;
|
|
}
|
|
|
|
// 'found' is set when the current iteration hit a message with PB_LAST
|
|
// (including PB_SOLO since the very first message).
|
|
if (found)
|
|
{
|
|
// the msg has to be ack'ed or it is allowed to read out of order, and was not read before
|
|
if (!*passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
|
|
{
|
|
HLOGC(mglog.Debug, log << "scanMsg: found next-to-broken message, delivering OUT OF ORDER.");
|
|
break;
|
|
}
|
|
|
|
found = false;
|
|
}
|
|
|
|
if (++ q == m_iSize)
|
|
q = 0;
|
|
|
|
if (q == m_iLastAckPos)
|
|
*passack = true;
|
|
}
|
|
|
|
// no msg found
|
|
if (!found)
|
|
{
|
|
// NOTE:
|
|
// This situation may only happen if:
|
|
// - Found a packet with PB_FIRST, so p = q at the moment when it was found
|
|
// - Possibly found following components of that message up to shifted q
|
|
// - Found no terminal packet (PB_LAST) for that message.
|
|
|
|
// if the message is larger than the receiver buffer, return part of the message
|
|
if ((p != -1) && ((q + 1) % m_iSize == p))
|
|
{
|
|
HLOGC(mglog.Debug, log << "scanMsg: BUFFER FULL and message is INCOMPLETE. Returning PARTIAL MESSAGE.");
|
|
found = true;
|
|
}
|
|
else
|
|
{
|
|
HLOGC(mglog.Debug, log << "scanMsg: PARTIAL or NO MESSAGE found: p=" << p << " q=" << q);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
HLOGC(mglog.Debug, log << "scanMsg: extracted message p=" << p << " q=" << q << " (" << ((q-p+m_iSize+1)%m_iSize) << " packets)");
|
|
}
|
|
|
|
return found;
|
|
}
|