1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Upgrade libsrt to v1.5.3. v5.0.183 v6.0.81 (#3808)

fix https://github.com/ossrs/srs/issues/3155
Build srt-1-fit fails with `standard attributes in middle of
decl-specifiers` on GCC 12,Arch Linux.

See https://github.com/Haivision/srt/releases/tag/v1.5.3
This commit is contained in:
Haibo Chen 2023-09-21 22:23:56 +08:00 committed by GitHub
parent f9bba0a9b0
commit c5e067fb0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
94 changed files with 5974 additions and 6273 deletions

View file

@ -226,12 +226,12 @@ CUDTGroup::SocketData* CUDTGroup::add(SocketData data)
data.sndstate = SRT_GST_PENDING;
data.rcvstate = SRT_GST_PENDING;
HLOGC(gmlog.Debug, log << "CUDTGroup::add: adding new member @" << data.id);
LOGC(gmlog.Note, log << "group/add: adding member @" << data.id << " into group $" << id());
m_Group.push_back(data);
gli_t end = m_Group.end();
if (m_iMaxPayloadSize == -1)
{
int plsize = data.ps->core().OPT_PayloadSize();
int plsize = (int)data.ps->core().OPT_PayloadSize();
HLOGC(gmlog.Debug,
log << "CUDTGroup::add: taking MAX payload size from socket @" << data.ps->m_SocketID << ": " << plsize
<< " " << (plsize ? "(explicit)" : "(unspecified = fallback to 1456)"));
@ -251,7 +251,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
: m_Global(CUDT::uglobal())
, m_GroupID(-1)
, m_PeerGroupID(-1)
, m_bSyncOnMsgNo(false)
, m_type(gtype)
, m_listener()
, m_iBusy()
@ -336,6 +335,7 @@ void CUDTGroup::GroupContainer::erase(CUDTGroup::gli_t it)
}
}
m_List.erase(it);
--m_SizeCache;
}
void CUDTGroup::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen)
@ -394,12 +394,6 @@ void CUDTGroup::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen)
break;
case SRTO_CONGESTION:
// Currently no socket groups allow any other
// congestion control mode other than live.
LOGP(gmlog.Error, "group option: SRTO_CONGESTION is only allowed as 'live' and cannot be changed");
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
default:
break;
}
@ -554,7 +548,7 @@ void CUDTGroup::deriveSettings(CUDT* u)
if (u->m_config.CryptoSecret.len)
{
string password((const char*)u->m_config.CryptoSecret.str, u->m_config.CryptoSecret.len);
m_config.push_back(ConfigItem(SRTO_PASSPHRASE, password.c_str(), password.size()));
m_config.push_back(ConfigItem(SRTO_PASSPHRASE, password.c_str(), (int)password.size()));
}
IM(SRTO_KMREFRESHRATE, uKmRefreshRatePkt);
@ -563,7 +557,7 @@ void CUDTGroup::deriveSettings(CUDT* u)
string cc = u->m_CongCtl.selected_name();
if (cc != "live")
{
m_config.push_back(ConfigItem(SRTO_CONGESTION, cc.c_str(), cc.size()));
m_config.push_back(ConfigItem(SRTO_CONGESTION, cc.c_str(), (int)cc.size()));
}
// NOTE: This is based on information extracted from the "semi-copy-constructor" of CUDT class.
@ -893,6 +887,16 @@ void CUDTGroup::close()
HLOGC(smlog.Debug, log << "group/close: IPE(NF): group member @" << ig->id << " already deleted");
continue;
}
// Make the socket closing BEFORE withdrawing its group membership
// because a socket created as a group member cannot be valid
// without the group.
// This is not true in case of non-managed groups, which
// only collect sockets, but also non-managed groups should not
// use common group buffering and tsbpd. Also currently there are
// no other groups than managed one.
s->setClosing();
s->m_GroupOf = NULL;
s->m_GroupMemberData = NULL;
HLOGC(smlog.Debug, log << "group/close: CUTTING OFF @" << ig->id << " (found as @" << s->m_SocketID << ") from the group");
@ -1219,12 +1223,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
// and therefore take over the leading role in setting the ISN. If the
// second one fails, too, then the only remaining idle link will simply
// go with its own original sequence.
//
// On the opposite side the reader should know that the link is inactive
// so the first received payload activates it. Activation of an idle link
// means that the very first packet arriving is TAKEN AS A GOOD DEAL, that is,
// no LOSSREPORT is sent even if the sequence looks like a "jumped over".
// Only for activated links is the LOSSREPORT sent upon seqhole detection.
// On the opposite side, if the first packet arriving looks like a jump over,
// the corresponding LOSSREPORT is sent. For packets that are truly lost,
// the sender retransmits them, for packets that before ISN, DROPREQ is sent.
// Now we can go to the idle links and attempt to send the payload
// also over them.
@ -1714,7 +1716,7 @@ int CUDTGroup::getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize)
copyGroupData(*d, (pdata[i]));
}
return m_Group.size();
return (int)m_Group.size();
}
// [[using locked(this->m_GroupLock)]]
@ -1888,7 +1890,7 @@ void CUDTGroup::recv_CollectAliveAndBroken(vector<CUDTSocket*>& alive, set<CUDTS
// Don't skip packets that are ahead because if we have a situation
// that all links are either "elephants" (do not report read readiness)
// and "kangaroos" (have already delivered an ahead packet) then
// omiting kangaroos will result in only elephants to be polled for
// omitting kangaroos will result in only elephants to be polled for
// reading. Due to the strict timing requirements and ensurance that
// TSBPD on every link will result in exactly the same delivery time
// for a packet of given sequence, having an elephant and kangaroo in
@ -2134,7 +2136,6 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno)
return false;
}
#ifdef ENABLE_NEW_RCVBUFFER
int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
{
// First, acquire GlobControlLock to make sure all member sockets still exist
@ -2197,7 +2198,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
// Find the first readable packet among all member sockets.
CUDTSocket* socketToRead = NULL;
CRcvBufferNew::PacketInfo infoToRead = {-1, false, time_point()};
CRcvBuffer::PacketInfo infoToRead = {-1, false, time_point()};
for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)
{
CUDTSocket* ps = *si;
@ -2215,7 +2216,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
}
}
const CRcvBufferNew::PacketInfo info =
const CRcvBuffer::PacketInfo info =
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now());
if (info.seqno == SRT_SEQNO_NONE)
{
@ -2292,13 +2293,14 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
m_stats.recv.count(res);
updateAvgPayloadSize(res);
bool canReadFurther = false;
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
@ -2306,596 +2308,22 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
}
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
if (!ps->core().isRcvBufferReady())
if (!ps->core().isRcvBufferReadyNoLock())
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
else
canReadFurther = true;
}
if (!canReadFurther)
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
return res;
}
LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
#else
// The "app reader" version of the reading function.
// This reads the packets from every socket treating them as independent
// and prepared to work with the application. Then packets are sorted out
// by getting the sequence number.
int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
{
typedef map<SRTSOCKET, ReadPos>::iterator pit_t;
// Later iteration over it might be less efficient than
// by vector, but we'll also often try to check a single id
// if it was ever seen broken, so that it's skipped.
set<CUDTSocket*> broken;
size_t output_size = 0;
// First, acquire GlobControlLock to make sure all member sockets still exist
enterCS(m_Global.m_GlobControlLock);
ScopedLock guard(m_GroupLock);
if (m_bClosing)
{
// The group could be set closing in the meantime, but if
// this is only about to be set by another thread, this thread
// must fist wait for being able to acquire this lock.
// The group will not be deleted now because it is added usage counter
// by this call, but will be released once it exits.
leaveCS(m_Global.m_GlobControlLock);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Now, still under lock, check if all sockets still can be dispatched
send_CheckValidSockets();
leaveCS(m_Global.m_GlobControlLock);
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
for (;;)
{
if (!m_bOpened || !m_bConnected)
{
LOGC(grlog.Error,
log << boolalpha << "group/recv: ERROR opened=" << m_bOpened << " connected=" << m_bConnected);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}
// Check first the ahead packets if you have any to deliver.
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !m_Positions.empty())
{
// This function also updates the group sequence pointer.
ReadPos* pos = checkPacketAhead();
if (pos)
{
if (size_t(len) < pos->packet.size())
throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0);
HLOGC(grlog.Debug,
log << "group/recv: delivering AHEAD packet %" << pos->mctrl.pktseq << " #" << pos->mctrl.msgno
<< ": " << BufferStamp(&pos->packet[0], pos->packet.size()));
memcpy(buf, &pos->packet[0], pos->packet.size());
fillGroupData((w_mc), pos->mctrl);
m_RcvBaseSeqNo = pos->mctrl.pktseq;
len = pos->packet.size();
pos->packet.clear();
// Update stats as per delivery
m_stats.recv.count(len);
updateAvgPayloadSize(len);
// We predict to have only one packet ahead, others are pending to be reported by tsbpd.
// This will be "re-enabled" if the later check puts any new packet into ahead.
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
return len;
}
}
// LINK QUALIFICATION NAMES:
//
// HORSE: Correct link, which delivers the very next sequence.
// Not necessarily this link is currently active.
//
// KANGAROO: Got some packets dropped and the sequence number
// of the packet jumps over the very next sequence and delivers
// an ahead packet.
//
// ELEPHANT: Is not ready to read, while others are, or reading
// up to the current latest delivery sequence number does not
// reach this sequence and the link becomes non-readable earlier.
// The above condition has ruled out one kangaroo and turned it
// into a horse.
// Below there's a loop that will try to extract packets. Kangaroos
// will be among the polled ones because skipping them risks that
// the elephants will take over the reading. Links already known as
// elephants will be also polled in an attempt to revitalize the
// connection that experienced just a short living choking.
//
// After polling we attempt to read from every link that reported
// read-readiness and read at most up to the sequence equal to the
// current delivery sequence.
// Links that deliver a packet below that sequence will be retried
// until they deliver no more packets or deliver the packet of
// expected sequence. Links that don't have a record in m_Positions
// and report readiness will be always read, at least to know what
// sequence they currently stand on.
//
// Links that are already known as kangaroos will be polled, but
// no reading attempt will be done. If after the reading series
// it will turn out that we have no more horses, the slowest kangaroo
// will be "upgraded to a horse" (the ahead link with a sequence
// closest to the current delivery sequence will get its sequence
// set as current delivered and its recorded ahead packet returned
// as the read packet).
// If we find at least one horse, the packet read from that link
// will be delivered. All other link will be just ensured update
// up to this sequence number, or at worst all available packets
// will be read. In this case all kangaroos remain kangaroos,
// until the current delivery sequence m_RcvBaseSeqNo will be lifted
// to the sequence recorded for these links in m_Positions,
// during the next time ahead check, after which they will become
// horses.
const size_t size = m_Group.size();
// Prepare first the list of sockets to be added as connect-pending
// and as read-ready, then unlock the group, and then add them to epoll.
vector<CUDTSocket*> aliveMembers;
recv_CollectAliveAndBroken(aliveMembers, broken);
const vector<CUDTSocket*> ready_sockets = recv_WaitForReadReady(aliveMembers, broken);
// m_GlobControlLock lifted, m_GroupLock still locked.
// Now we can safely do this scoped way.
if (!m_bSynRecving && ready_sockets.empty())
{
HLOGC(grlog.Debug,
log << "group/rcv $" << m_GroupID << ": Not available AT THIS TIME, NOT READ-READY now.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
// Ok, now we need to have some extra qualifications:
// 1. If a socket has no registry yet, we read anyway, just
// to notify the current position. We read ONLY ONE PACKET this time,
// we'll worry later about adjusting it to the current group sequence
// position.
// 2. If a socket is already position ahead, DO NOT read from it, even
// if it is ready.
// The state of things whether we were able to extract the very next
// sequence will be simply defined by the fact that `output` is nonempty.
int32_t next_seq = m_RcvBaseSeqNo;
if (m_bClosing)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: GROUP CLOSED, ABANDONING");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
//
// NOTE: Although m_GlobControlLock is lifted here so potentially sockets
// colected in ready_sockets could be closed at any time, all of them are member
// sockets of this group. Therefore the first socket attempted to be closed will
// have to remove the socket from the group, and this will require lock on GroupLock,
// which is still applied here. So this will have to wait for this function to finish
// (or block on swait, in which case the lock is lifted) anyway.
for (vector<CUDTSocket*>::const_iterator si = ready_sockets.begin(); si != ready_sockets.end(); ++si)
{
CUDTSocket* ps = *si;
SRTSOCKET id = ps->m_SocketID;
ReadPos* p = NULL;
pit_t pe = m_Positions.find(id);
if (pe != m_Positions.end())
{
p = &pe->second;
// Possible results of comparison:
// x < 0: the sequence is in the past, the socket should be adjusted FIRST
// x = 0: the socket should be ready to get the exactly next packet
// x = 1: the case is already handled by GroupCheckPacketAhead.
// x > 1: AHEAD. DO NOT READ.
const int seqdiff = CSeqNo::seqcmp(p->mctrl.pktseq, m_RcvBaseSeqNo);
if (seqdiff > 1)
{
HLOGC(grlog.Debug,
log << "group/recv: EPOLL: @" << id << " %" << p->mctrl.pktseq << " AHEAD %" << m_RcvBaseSeqNo
<< ", not reading.");
continue;
}
}
else
{
// The position is not known, so get the position on which
// the socket is currently standing.
pair<pit_t, bool> ee = m_Positions.insert(make_pair(id, ReadPos(ps->core().m_iRcvLastSkipAck)));
p = &(ee.first->second);
HLOGC(grlog.Debug,
log << "group/recv: EPOLL: @" << id << " %" << p->mctrl.pktseq << " NEW SOCKET INSERTED");
}
// Read from this socket stubbornly, until:
// - reading is no longer possible (AGAIN)
// - the sequence difference is >= 1
for (;;)
{
SRT_MSGCTRL mctrl = srt_msgctrl_default;
// Read the data into the user's buffer. This is an optimistic
// prediction that we'll read the right data. This will be overwritten
// by "more correct data" if found more appropriate later. But we have to
// copy these data anyway anywhere, even if they need to fall on the floor later.
int stat;
char extrabuf[SRT_LIVE_MAX_PLSIZE];
char* msgbuf = NULL;
if (output_size)
{
// We already have the target data in `buf`. Now reading extra data potentially redundant (to be ignored)
// or AHEAD (to be buffered internally by the group)
msgbuf = extrabuf;
stat = ps->core().receiveMessage((extrabuf), SRT_LIVE_MAX_PLSIZE, (mctrl), CUDTUnited::ERH_RETURN);
HLOGC(grlog.Debug,
log << "group/recv: @" << id << " EXTRACTED EXTRA data with %" << mctrl.pktseq
<< " #" << mctrl.msgno << ": " << (stat <= 0 ? "(NOTHING)" : BufferStamp(extrabuf, stat))
<< (CSeqNo::seqcmp(mctrl.pktseq, m_RcvBaseSeqNo) > 1 ? " - TO STORE" : " - TO IGNORE"));
}
else
{
msgbuf = buf;
stat = ps->core().receiveMessage((buf), len, (mctrl), CUDTUnited::ERH_RETURN);
HLOGC(grlog.Debug,
log << "group/recv: @" << id << " EXTRACTED data with %" << mctrl.pktseq << " #"
<< mctrl.msgno << ": " << (stat <= 0 ? "(NOTHING)" : BufferStamp(buf, stat)));
}
if (stat == 0)
{
HLOGC(grlog.Debug, log << "group/recv @" << id << ": SPURIOUS epoll, ignoring");
// This is returned in case of "again". In case of errors, we have SRT_ERROR.
// Do not treat this as spurious, just stop reading.
break;
}
if (stat == SRT_ERROR)
{
HLOGC(grlog.Debug, log << "group/recv: @" << id << ": " << srt_getlasterror_str());
broken.insert(ps);
break;
}
// NOTE: checks against m_RcvBaseSeqNo and decisions based on it
// must NOT be done if m_RcvBaseSeqNo is SRT_SEQNO_NONE, which
// means that we are about to deliver the very first packet and we
// take its sequence number as a good deal.
// The order must be:
// - check discrepancy
// - record the sequence
// - check ordering.
// The second one must be done always, but failed discrepancy
// check should exclude the socket from any further checks.
// That's why the common check for m_RcvBaseSeqNo != SRT_SEQNO_NONE can't
// embrace everything below.
// We need to first qualify the sequence, just for a case
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, mctrl.pktseq))
{
// This error should be returned if the link turns out
// to be the only one, or set to the group data.
// err = SRT_ESECFAIL;
LOGC(grlog.Error,
log << "group/recv: @" << id << ": SEQUENCE DISCREPANCY: base=%" << m_RcvBaseSeqNo
<< " vs pkt=%" << mctrl.pktseq << ", setting ESECFAIL");
broken.insert(ps);
break;
}
// Rewrite it to the state for a case when next reading
// would not succeed. Do not insert the buffer here because
// this is only required when the sequence is ahead; for that
// it will be fixed later.
p->mctrl.pktseq = mctrl.pktseq;
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
// Now we can safely check it.
const int seqdiff = CSeqNo::seqcmp(mctrl.pktseq, m_RcvBaseSeqNo);
if (seqdiff <= 0)
{
HLOGC(grlog.Debug,
log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno
<< " BEHIND base=%" << m_RcvBaseSeqNo << " - discarding");
// The sequence is recorded, the packet has to be discarded.
m_stats.recvDiscard.count(stat);
continue;
}
// Now we have only two possibilities:
// seqdiff == 1: The very next sequence, we want to read and return the packet.
// seqdiff > 1: The packet is ahead - record the ahead packet, but continue with the others.
if (seqdiff > 1)
{
HLOGC(grlog.Debug,
log << "@" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno << " AHEAD base=%"
<< m_RcvBaseSeqNo);
p->packet.assign(msgbuf, msgbuf + stat);
p->mctrl = mctrl;
break; // Don't read from that socket anymore.
}
}
// We have seqdiff = 1, or we simply have the very first packet
// which's sequence is taken as a good deal. Update the sequence
// and record output.
if (output_size)
{
HLOGC(grlog.Debug,
log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno << " REDUNDANT");
break;
}
HLOGC(grlog.Debug,
log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno << " DELIVERING");
output_size = stat;
fillGroupData((w_mc), mctrl);
// Update stats as per delivery
m_stats.recv.count(output_size);
updateAvgPayloadSize(output_size);
// Record, but do not update yet, until all sockets are handled.
next_seq = mctrl.pktseq;
break;
}
}
#if ENABLE_HEAVY_LOGGING
if (!broken.empty())
{
std::ostringstream brks;
for (set<CUDTSocket*>::iterator b = broken.begin(); b != broken.end(); ++b)
brks << "@" << (*b)->m_SocketID << " ";
LOGC(grlog.Debug, log << "group/recv: REMOVING BROKEN: " << brks.str());
}
#endif
vector<SRTSOCKET> brokenid;
// Now remove all broken sockets from aheads, if any.
// Even if they have already delivered a packet.
for (set<CUDTSocket*>::iterator di = broken.begin(); di != broken.end(); ++di)
{
CUDTSocket* ps = *di;
m_Positions.erase(ps->m_SocketID);
//ps->setBrokenClosed();
}
// Force closing
{
InvertedLock ung (m_GroupLock);
for (set<CUDTSocket*>::iterator b = broken.begin(); b != broken.end(); ++b)
{
CUDT::uglobal().close(*b);
}
}
if (broken.size() >= size) // This > is for sanity check
{
// All broken
HLOGC(grlog.Debug, log << "group/recv: All sockets broken");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// May be required to be re-read.
broken.clear();
if (output_size)
{
// We have extracted something, meaning that we have the sequence shift.
// Update it now and don't do anything else with the sockets.
// Sanity check
if (next_seq == SRT_SEQNO_NONE)
{
LOGP(grlog.Error, "IPE: next_seq not set after output extracted!");
// This should never happen, but the only way to keep the code
// safe an recoverable is to use the incremented sequence. By
// leaving the sequence as is there's a risk of hangup.
// Not doing it in case of SRT_SEQNO_NONE as it would make a valid %0.
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
m_RcvBaseSeqNo = CSeqNo::incseq(m_RcvBaseSeqNo);
}
else
{
m_RcvBaseSeqNo = next_seq;
}
const ReadPos* pos = checkPacketAhead();
if (!pos)
{
// Don't clear the read-readinsess state if you have a packet ahead because
// if you have, the next read call will return it.
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
}
HLOGC(grlog.Debug,
log << "group/recv: successfully extracted packet size=" << output_size << " - returning");
return output_size;
}
HLOGC(grlog.Debug, log << "group/recv: NOT extracted anything - checking for a need to kick kangaroos");
// Check if we have any sockets left :D
// Here we surely don't have any more HORSES,
// only ELEPHANTS and KANGAROOS. Qualify them and
// attempt to at least take advantage of KANGAROOS.
// In this position all links are either:
// - updated to the current position
// - updated to the newest possible possition available
// - not yet ready for extraction (not present in the group)
// If we haven't extracted the very next sequence position,
// it means that we might only have the ahead packets read,
// that is, the next sequence has been dropped by all links.
if (!m_Positions.empty())
{
// This might notify both lingering links, which didn't
// deliver the required sequence yet, and links that have
// the sequence ahead. Review them, and if you find at
// least one packet behind, just wait for it to be ready.
// Use again the waiting function because we don't want
// the general waiting procedure to skip others.
set<SRTSOCKET> elephants;
// const because it's `typename decltype(m_Positions)::value_type`
pair<const SRTSOCKET, ReadPos>* slowest_kangaroo = 0;
for (pit_t rp = m_Positions.begin(); rp != m_Positions.end(); ++rp)
{
// NOTE that m_RcvBaseSeqNo in this place wasn't updated
// because we haven't successfully extracted anything.
int seqdiff = CSeqNo::seqcmp(rp->second.mctrl.pktseq, m_RcvBaseSeqNo);
if (seqdiff < 0)
{
elephants.insert(rp->first);
}
// If seqdiff == 0, we have a socket ON TRACK.
else if (seqdiff > 0)
{
// If there's already a slowest_kangaroo, seqdiff decides if this one is slower.
// Otherwise it is always slower by having no competition.
seqdiff = slowest_kangaroo
? CSeqNo::seqcmp(slowest_kangaroo->second.mctrl.pktseq, rp->second.mctrl.pktseq)
: 1;
if (seqdiff > 0)
{
slowest_kangaroo = &*rp;
}
}
}
// Note that if no "slowest_kangaroo" was found, it means
// that we don't have kangaroos.
if (slowest_kangaroo)
{
// We have a slowest kangaroo. Elephants must be ignored.
// Best case, they will get revived, worst case they will be
// soon broken.
//
// As we already have the packet delivered by the slowest
// kangaroo, we can simply return it.
// Check how many were skipped and add them to the stats
const int32_t jump = (CSeqNo(slowest_kangaroo->second.mctrl.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1;
if (jump > 0)
{
m_stats.recvDrop.count(stats::BytesPackets(jump * static_cast<uint64_t>(avgRcvPacketSize()), jump));
LOGC(grlog.Warn,
log << "@" << m_GroupID << " GROUP RCV-DROPPED " << jump << " packet(s): seqno %"
<< m_RcvBaseSeqNo << " to %" << slowest_kangaroo->second.mctrl.pktseq);
}
m_RcvBaseSeqNo = slowest_kangaroo->second.mctrl.pktseq;
vector<char>& pkt = slowest_kangaroo->second.packet;
if (size_t(len) < pkt.size())
throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0);
HLOGC(grlog.Debug,
log << "@" << slowest_kangaroo->first << " KANGAROO->HORSE %"
<< slowest_kangaroo->second.mctrl.pktseq << " #" << slowest_kangaroo->second.mctrl.msgno
<< ": " << BufferStamp(&pkt[0], pkt.size()));
memcpy(buf, &pkt[0], pkt.size());
fillGroupData((w_mc), slowest_kangaroo->second.mctrl);
len = pkt.size();
pkt.clear();
// Update stats as per delivery
m_stats.recv.count(len);
updateAvgPayloadSize(len);
// It is unlikely to have a packet ahead because usually having one packet jumped-ahead
// clears the possibility of having aheads at all.
// XXX Research if this is possible at all; if it isn't, then don't waste time on
// looking for it.
const ReadPos* pos = checkPacketAhead();
if (!pos)
{
// Don't clear the read-readinsess state if you have a packet ahead because
// if you have, the next read call will return it.
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
}
return len;
}
HLOGC(grlog.Debug,
log << "group/recv: "
<< (elephants.empty() ? "NO LINKS REPORTED ANY FRESHER PACKET." : "ALL LINKS ELEPHANTS.")
<< " Re-polling.");
}
else
{
HLOGC(grlog.Debug, log << "group/recv: POSITIONS EMPTY - Re-polling.");
}
}
}
#endif
// [[using locked(m_GroupLock)]]
CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()
{
typedef map<SRTSOCKET, ReadPos>::iterator pit_t;
ReadPos* out = 0;
// This map no longer maps only ahead links.
// Here are all links, and whether ahead, it's defined by the sequence.
for (pit_t i = m_Positions.begin(); i != m_Positions.end(); ++i)
{
// i->first: socket ID
// i->second: ReadPos { sequence, packet }
// We are not interested with the socket ID because we
// aren't going to read from it - we have the packet already.
ReadPos& a = i->second;
const int seqdiff = CSeqNo::seqcmp(a.mctrl.pktseq, m_RcvBaseSeqNo);
if (seqdiff == 1)
{
// The very next packet. Return it.
HLOGC(grlog.Debug,
log << "group/recv: Base %" << m_RcvBaseSeqNo << " ahead delivery POSSIBLE %" << a.mctrl.pktseq
<< " #" << a.mctrl.msgno << " from @" << i->first << ")");
out = &a;
}
else if (seqdiff < 1 && !a.packet.empty())
{
HLOGC(grlog.Debug,
log << "group/recv: @" << i->first << " dropping collected ahead %" << a.mctrl.pktseq << "#"
<< a.mctrl.msgno << " with base %" << m_RcvBaseSeqNo);
a.packet.clear();
}
// In case when it's >1, keep it in ahead
}
return out;
}
const char* CUDTGroup::StateStr(CUDTGroup::GroupState st)
{
@ -3684,7 +3112,7 @@ void CUDTGroup::sendBackup_CheckUnstableSockets(SendBackupCtx& w_sendBackupCtx,
<< " is qualified as unstable, but does not have the 'unstable since' timestamp. Still marking for closure.");
}
const int unstable_for_ms = count_milliseconds(currtime - sock.m_tsUnstableSince);
const int unstable_for_ms = (int)count_milliseconds(currtime - sock.m_tsUnstableSince);
if (unstable_for_ms < sock.peerIdleTimeout_ms())
continue;
@ -4399,7 +3827,7 @@ int CUDTGroup::sendBackupRexmit(CUDT& core, SRT_MSGCTRL& w_mc)
{
// NOTE: an exception from here will interrupt the loop
// and will be caught in the upper level.
stat = core.sendmsg2(i->data, i->size, (i->mc));
stat = core.sendmsg2(i->data, (int)i->size, (i->mc));
if (stat == -1)
{
// Stop sending if one sending ended up with error
@ -4529,7 +3957,7 @@ void CUDTGroup::updateLatestRcv(CUDTSocket* s)
HLOGC(grlog.Debug,
log << "updateLatestRcv: BACKUP group, updating from active link @" << s->m_SocketID << " with %"
<< s->core().m_iRcvLastSkipAck);
<< s->core().m_iRcvLastAck);
CUDT* source = &s->core();
vector<CUDT*> targets;