1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SRT: Upgrade libsrt from 1.4.1 to 1.5.1. v6.0.12 (#3362)

Co-authored-by: winlin <winlin@vip.126.com>
This commit is contained in:
john 2023-01-04 19:56:33 +08:00 committed by GitHub
parent 7a56208f2f
commit fe086dfc31
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
143 changed files with 38185 additions and 15108 deletions

View file

@ -50,37 +50,35 @@ modified by
Haivision Systems Inc.
*****************************************************************************/
#ifdef LINUX
#include <sys/epoll.h>
#include <unistd.h>
#endif
#if __APPLE__
#include "TargetConditionals.h"
#endif
#if defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#if defined(__ANDROID__) || defined(ANDROID)
#include <sys/select.h>
#endif
#define SRT_IMPORT_EVENT
#include "platform_sys.h"
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <iterator>
#if defined(__FreeBSD_kernel__)
#include <sys/event.h>
#endif
#include "common.h"
#include "epoll.h"
#include "logging.h"
#include "udt.h"
using namespace std;
using namespace srt::sync;
#if ENABLE_HEAVY_LOGGING
namespace srt {
static ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0);
}
#endif
namespace srt_logging
{
extern Logger mglog;
extern Logger eilog, ealog;
}
using namespace srt_logging;
@ -89,20 +87,21 @@ using namespace srt_logging;
#define IF_DIRNAME(tested, flag, name) (tested & flag ? name : "")
#endif
CEPoll::CEPoll():
srt::CEPoll::CEPoll():
m_iIDSeed(0)
{
CGuard::createMutex(m_EPollLock);
// Exception -> CUDTUnited ctor.
setupMutex(m_EPollLock, "EPoll");
}
CEPoll::~CEPoll()
srt::CEPoll::~CEPoll()
{
CGuard::releaseMutex(m_EPollLock);
releaseMutex(m_EPollLock);
}
int CEPoll::create()
int srt::CEPoll::create(CEPollDesc** pout)
{
CGuard pg(m_EPollLock);
ScopedLock pg(m_EPollLock);
if (++ m_iIDSeed >= 0x7FFFFFFF)
m_iIDSeed = 0;
@ -114,7 +113,31 @@ int CEPoll::create()
int localid = 0;
#ifdef LINUX
localid = epoll_create(1024);
// NOTE: epoll_create1() and EPOLL_CLOEXEC were introduced in GLIBC-2.9.
// So earlier versions of GLIBC, must use epoll_create() and set
// FD_CLOEXEC on the file descriptor returned by it after the fact.
#if defined(EPOLL_CLOEXEC)
int flags = 0;
#if ENABLE_SOCK_CLOEXEC
flags |= EPOLL_CLOEXEC;
#endif
localid = epoll_create1(flags);
#else
localid = epoll_create(1);
#if ENABLE_SOCK_CLOEXEC
if (localid != -1)
{
int fdFlags = fcntl(localid, F_GETFD);
if (fdFlags != -1)
{
fdFlags |= FD_CLOEXEC;
fcntl(localid, F_SETFD, fdFlags);
}
}
#endif
#endif
/* Possible reasons of -1 error:
EMFILE: The per-user limit on the number of epoll instances imposed by /proc/sys/fs/epoll/max_user_instances was encountered.
ENFILE: The system limit on the total number of open files has been reached.
@ -122,25 +145,82 @@ ENOMEM: There was insufficient memory to create the kernel object.
*/
if (localid < 0)
throw CUDTException(MJ_SETUP, MN_NONE, errno);
#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#elif defined(BSD) || TARGET_OS_MAC
localid = kqueue();
if (localid < 0)
throw CUDTException(MJ_SETUP, MN_NONE, errno);
#else
// on Solaris, use /dev/poll
// TODO: Solaris, use port_getn()
// https://docs.oracle.com/cd/E86824_01/html/E54766/port-get-3c.html
// on Windows, select
#endif
pair<map<int, CEPollDesc>::iterator, bool> res = m_mPolls.insert(make_pair(m_iIDSeed, CEPollDesc(m_iIDSeed, localid)));
if (!res.second) // Insertion failed (no memory?)
throw CUDTException(MJ_SETUP, MN_NONE);
if (pout)
*pout = &res.first->second;
return m_iIDSeed;
}
int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
int srt::CEPoll::clear_usocks(int eid)
{
CGuard pg(m_EPollLock);
// This should remove all SRT sockets from given eid.
ScopedLock pg (m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
CEPollDesc& d = p->second;
d.clearAll();
return 0;
}
void srt::CEPoll::clear_ready_usocks(CEPollDesc& d, int direction)
{
if ((direction & ~SRT_EPOLL_EVENTTYPES) != 0)
{
// This is internal function, so simply report an IPE on incorrect usage.
LOGC(eilog.Error, log << "CEPoll::clear_ready_usocks: IPE, event flags exceed event types: " << direction);
return;
}
ScopedLock pg (m_EPollLock);
vector<SRTSOCKET> cleared;
CEPollDesc::enotice_t::iterator i = d.enotice_begin();
while (i != d.enotice_end())
{
IF_HEAVY_LOGGING(SRTSOCKET subsock = i->fd);
SRTSOCKET rs = d.clearEventSub(i++, direction);
// This function returns:
// - a valid socket - if there are no other subscription after 'direction' was cleared
// - SRT_INVALID_SOCK otherwise
// Valid sockets should be collected as sockets that no longer
// have a subscribed event should be deleted from subscriptions.
if (rs != SRT_INVALID_SOCK)
{
HLOGC(eilog.Debug, log << "CEPoll::clear_ready_usocks: @" << rs << " got all subscription cleared");
cleared.push_back(rs);
}
else
{
HLOGC(eilog.Debug, log << "CEPoll::clear_ready_usocks: @" << subsock << " is still subscribed");
}
}
for (size_t i = 0; i < cleared.size(); ++i)
d.removeSubscription(cleared[i]);
}
int srt::CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
{
ScopedLock pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
@ -155,18 +235,18 @@ int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
else
{
ev.events = 0;
if (*events & UDT_EPOLL_IN)
if (*events & SRT_EPOLL_IN)
ev.events |= EPOLLIN;
if (*events & UDT_EPOLL_OUT)
if (*events & SRT_EPOLL_OUT)
ev.events |= EPOLLOUT;
if (*events & UDT_EPOLL_ERR)
if (*events & SRT_EPOLL_ERR)
ev.events |= EPOLLERR;
}
ev.data.fd = s;
if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_ADD, s, &ev) < 0)
throw CUDTException();
#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#elif defined(BSD) || TARGET_OS_MAC
struct kevent ke[2];
int num = 0;
@ -177,11 +257,11 @@ int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
}
else
{
if (*events & UDT_EPOLL_IN)
if (*events & SRT_EPOLL_IN)
{
EV_SET(&ke[num++], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
}
if (*events & UDT_EPOLL_OUT)
if (*events & SRT_EPOLL_OUT)
{
EV_SET(&ke[num++], s, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
}
@ -190,6 +270,10 @@ int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
throw CUDTException();
#else
// fake use 'events' to prevent warning. Remove when implemented.
(void)events;
(void)s;
#ifdef _MSC_VER
// Microsoft Visual Studio doesn't support the #warning directive - nonstandard anyway.
// Use #pragma message with the same text.
@ -206,9 +290,9 @@ int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
return 0;
}
int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
int srt::CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
{
CGuard pg(m_EPollLock);
ScopedLock pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
@ -218,7 +302,7 @@ int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
epoll_event ev; // ev is ignored, for compatibility with old Linux kernel only.
if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_DEL, s, &ev) < 0)
throw CUDTException();
#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#elif defined(BSD) || TARGET_OS_MAC
struct kevent ke;
//
@ -237,9 +321,10 @@ int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
}
// Need this to atomically modify polled events (ex: remove write/keep read)
int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
int srt::CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
{
CGuard pg(m_EPollLock);
ScopedLock pg(m_EPollLock);
IF_HEAVY_LOGGING(ostringstream evd);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
@ -250,9 +335,12 @@ int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
int32_t evts = events ? *events : uint32_t(SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR);
bool edgeTriggered = evts & SRT_EPOLL_ET;
evts &= ~SRT_EPOLL_ET;
// et_evts = all events, if SRT_EPOLL_ET, or only those that are always ET otherwise.
int32_t et_evts = edgeTriggered ? evts : evts & SRT_EPOLL_ETONLY;
if (evts)
{
pair<CEPollDesc::ewatch_t::iterator, bool> iter_new = d.addWatch(u, evts, edgeTriggered);
pair<CEPollDesc::ewatch_t::iterator, bool> iter_new = d.addWatch(u, evts, et_evts);
CEPollDesc::Wait& wait = iter_new.first->second;
if (!iter_new.second)
{
@ -260,6 +348,7 @@ int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
// parameter, but others are probably unchanged. Change them
// forcefully and take out notices that are no longer valid.
const int removable = wait.watch & ~evts;
IF_HEAVY_LOGGING(PrintEpollEvent(evd, evts & (~wait.watch)));
// Check if there are any events that would be removed.
// If there are no removed events watched (for example, when
@ -272,11 +361,16 @@ int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
// Update the watch configuration, including edge
wait.watch = evts;
if (edgeTriggered)
wait.edge = evts;
wait.edge = et_evts;
// Now it should look exactly like newly added
// and the state is also updated
HLOGC(ealog.Debug, log << "srt_epoll_update_usock: UPDATED E" << eid << " for @" << u << " +" << evd.str());
}
else
{
IF_HEAVY_LOGGING(PrintEpollEvent(evd, evts));
HLOGC(ealog.Debug, log << "srt_epoll_update_usock: ADDED E" << eid << " for @" << u << " " << evd.str());
}
const int newstate = wait.watch & wait.state;
@ -287,20 +381,21 @@ int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
}
else if (edgeTriggered)
{
// Specified only SRT_EPOLL_ET flag, but no event flag. Error.
LOGC(ealog.Error, log << "srt_epoll_update_usock: Specified only SRT_EPOLL_ET flag, but no event flag. Error.");
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}
else
{
// Update with no events means to remove subscription
HLOGC(ealog.Debug, log << "srt_epoll_update_usock: REMOVED E" << eid << " socket @" << u);
d.removeSubscription(u);
}
return 0;
}
int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
int srt::CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
{
CGuard pg(m_EPollLock);
ScopedLock pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
@ -315,18 +410,18 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
else
{
ev.events = 0;
if (*events & UDT_EPOLL_IN)
if (*events & SRT_EPOLL_IN)
ev.events |= EPOLLIN;
if (*events & UDT_EPOLL_OUT)
if (*events & SRT_EPOLL_OUT)
ev.events |= EPOLLOUT;
if (*events & UDT_EPOLL_ERR)
if (*events & SRT_EPOLL_ERR)
ev.events |= EPOLLERR;
}
ev.data.fd = s;
if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_MOD, s, &ev) < 0)
throw CUDTException();
#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#elif defined(BSD) || TARGET_OS_MAC
struct kevent ke[2];
int num = 0;
@ -345,17 +440,23 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
}
else
{
if (*events & UDT_EPOLL_IN)
if (*events & SRT_EPOLL_IN)
{
EV_SET(&ke[num++], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
}
if (*events & UDT_EPOLL_OUT)
if (*events & SRT_EPOLL_OUT)
{
EV_SET(&ke[num++], s, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
}
}
if (kevent(p->second.m_iLocalID, ke, num, NULL, 0, NULL) < 0)
throw CUDTException();
#else
// fake use 'events' to prevent warning. Remove when implemented.
(void)events;
(void)s;
#endif
// Assuming add is used if not inserted
// p->second.m_sLocals.insert(s);
@ -363,9 +464,9 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
return 0;
}
int CEPoll::setflags(const int eid, int32_t flags)
int srt::CEPoll::setflags(const int eid, int32_t flags)
{
CGuard pg(m_EPollLock);
ScopedLock pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
@ -388,7 +489,7 @@ int CEPoll::setflags(const int eid, int32_t flags)
return oflags;
}
int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
// It is allowed to call this function witn fdsSize == 0
// and therefore also NULL fdsSet. This will then only report
@ -396,12 +497,12 @@ int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t m
if (fdsSize < 0 || (fdsSize > 0 && !fdsSet))
throw CUDTException(MJ_NOTSUP, MN_INVAL);
int64_t entertime = CTimer::getTime();
steady_clock::time_point entertime = steady_clock::now();
while (true)
{
{
CGuard pg(m_EPollLock);
ScopedLock pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
@ -410,7 +511,7 @@ int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t m
if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty())
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_INVAL);
throw CUDTException(MJ_NOTSUP, MN_EEMPTY);
}
if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0))
@ -444,16 +545,16 @@ int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t m
return total;
}
if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
if ((msTimeOut >= 0) && (count_microseconds(srt::sync::steady_clock::now() - entertime) >= msTimeOut * int64_t(1000)))
break; // official wait does: throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
CTimer::waitForEvent();
CGlobEvent::waitForEvent();
}
return 0;
}
int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
int srt::CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
{
// if all fields is NULL and waiting time is infinite, then this would be a deadlock
if (!readfds && !writefds && !lrfds && !lwfds && (msTimeOut < 0))
@ -467,18 +568,16 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
int total = 0;
int64_t entertime = CTimer::getTime();
HLOGC(mglog.Debug, log << "CEPoll::wait: START for eid=" << eid);
srt::sync::steady_clock::time_point entertime = srt::sync::steady_clock::now();
while (true)
{
{
CGuard epollock(m_EPollLock);
ScopedLock epollock(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
{
LOGC(ealog.Error, log << "EID:" << eid << " INVALID.");
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
}
@ -487,7 +586,9 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty() && ed.m_sLocals.empty())
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_INVAL);
//throw CUDTException(MJ_NOTSUP, MN_INVAL);
LOGC(ealog.Error, log << "EID:" << eid << " no sockets to check, this would deadlock");
throw CUDTException(MJ_NOTSUP, MN_EEMPTY, 0);
}
if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK))
@ -507,13 +608,13 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
{
++it_next;
IF_HEAVY_LOGGING(++total_noticed);
if (readfds && ((it->events & UDT_EPOLL_IN) || (it->events & UDT_EPOLL_ERR)))
if (readfds && ((it->events & SRT_EPOLL_IN) || (it->events & SRT_EPOLL_ERR)))
{
if (readfds->insert(it->fd).second)
++total;
}
if (writefds && ((it->events & UDT_EPOLL_OUT) || (it->events & UDT_EPOLL_ERR)))
if (writefds && ((it->events & SRT_EPOLL_OUT) || (it->events & SRT_EPOLL_ERR)))
{
if (writefds->insert(it->fd).second)
++total;
@ -530,13 +631,14 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
}
}
HLOGC(mglog.Debug, log << "CEPoll::wait: REPORTED " << total << "/" << total_noticed
HLOGC(ealog.Debug, log << "CEPoll::wait: REPORTED " << total << "/" << total_noticed
<< debug_sockets.str());
if (lrfds || lwfds)
if ((lrfds || lwfds) && !ed.m_sLocals.empty())
{
#ifdef LINUX
const int max_events = ed.m_sLocals.size();
SRT_ASSERT(max_events > 0);
epoll_event ev[max_events];
int nfds = ::epoll_wait(ed.m_iLocalID, ev, max_events, 0);
@ -554,11 +656,12 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++ total;
}
}
HLOGC(mglog.Debug, log << "CEPoll::wait: LINUX: picking up " << (total - prev_total) << " ready fds.");
HLOGC(ealog.Debug, log << "CEPoll::wait: LINUX: picking up " << (total - prev_total) << " ready fds.");
#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#elif defined(BSD) || TARGET_OS_MAC
struct timespec tmout = {0, 0};
const int max_events = ed.m_sLocals.size();
SRT_ASSERT(max_events > 0);
struct kevent ke[max_events];
int nfds = kevent(ed.m_iLocalID, NULL, 0, ke, max_events, &tmout);
@ -578,7 +681,7 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
}
}
HLOGC(mglog.Debug, log << "CEPoll::wait: Darwin/BSD: picking up " << (total - prev_total) << " ready fds.");
HLOGC(ealog.Debug, log << "CEPoll::wait: Darwin/BSD: picking up " << (total - prev_total) << " ready fds.");
#else
//currently "select" is used for all non-Linux platforms.
@ -623,34 +726,127 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
}
}
HLOGC(mglog.Debug, log << "CEPoll::wait: select(otherSYS): picking up " << (total - prev_total) << " ready fds.");
HLOGC(ealog.Debug, log << "CEPoll::wait: select(otherSYS): picking up " << (total - prev_total) << " ready fds.");
#endif
}
} // END-LOCK: m_EPollLock
HLOGC(mglog.Debug, log << "CEPoll::wait: Total of " << total << " READY SOCKETS");
HLOGC(ealog.Debug, log << "CEPoll::wait: Total of " << total << " READY SOCKETS");
if (total > 0)
return total;
if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
if ((msTimeOut >= 0) && (count_microseconds(srt::sync::steady_clock::now() - entertime) >= msTimeOut * int64_t(1000)))
{
HLOGP(mglog.Debug, "... not waiting longer - timeout");
HLOGC(ealog.Debug, log << "EID:" << eid << ": TIMEOUT.");
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
}
CTimer::EWait wt ATR_UNUSED = CTimer::waitForEvent();
HLOGC(mglog.Debug, log << "CEPoll::wait: EVENT WAITING: "
<< (wt == CTimer::WT_TIMEOUT ? "CHECKPOINT" : wt == CTimer::WT_EVENT ? "TRIGGERED" : "ERROR"));
const bool wait_signaled SRT_ATR_UNUSED = CGlobEvent::waitForEvent();
HLOGC(ealog.Debug, log << "CEPoll::wait: EVENT WAITING: "
<< (wait_signaled ? "TRIGGERED" : "CHECKPOINT"));
}
return 0;
}
int CEPoll::release(const int eid)
int srt::CEPoll::swait(CEPollDesc& d, map<SRTSOCKET, int>& st, int64_t msTimeOut, bool report_by_exception)
{
CGuard pg(m_EPollLock);
{
ScopedLock lg (m_EPollLock);
if (!d.flags(SRT_EPOLL_ENABLE_EMPTY) && d.watch_empty() && msTimeOut < 0)
{
// no socket is being monitored, this may be a deadlock
LOGC(ealog.Error, log << "EID:" << d.m_iID << " no sockets to check, this would deadlock");
if (report_by_exception)
throw CUDTException(MJ_NOTSUP, MN_EEMPTY, 0);
return -1;
}
}
st.clear();
steady_clock::time_point entertime = steady_clock::now();
while (true)
{
{
// Not extracting separately because this function is
// for internal use only and we state that the eid could
// not be deleted or changed the target CEPollDesc in the
// meantime.
// Here we only prevent the pollset be updated simultaneously
// with unstable reading.
ScopedLock lg (m_EPollLock);
if (!d.flags(SRT_EPOLL_ENABLE_EMPTY) && d.watch_empty())
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_EEMPTY);
}
if (!d.m_sLocals.empty())
{
// XXX Add error log
// uwait should not be used with EIDs subscribed to system sockets
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}
bool empty = d.enotice_empty();
if (!empty || msTimeOut == 0)
{
IF_HEAVY_LOGGING(ostringstream singles);
// If msTimeOut == 0, it means that we need the information
// immediately, we don't want to wait. Therefore in this case
// report also when none is ready.
int total = 0; // This is a list, so count it during iteration
CEPollDesc::enotice_t::iterator i = d.enotice_begin();
while (i != d.enotice_end())
{
++total;
st[i->fd] = i->events;
IF_HEAVY_LOGGING(singles << "@" << i->fd << ":");
IF_HEAVY_LOGGING(PrintEpollEvent(singles, i->events, i->parent->edgeOnly()));
const bool edged SRT_ATR_UNUSED = d.checkEdge(i++); // NOTE: potentially deletes `i`
IF_HEAVY_LOGGING(singles << (edged ? "<^> " : " "));
}
// Logging into 'singles' because it notifies as to whether
// the edge-triggered event has been cleared
HLOGC(ealog.Debug, log << "E" << d.m_iID << " rdy=" << total << ": "
<< singles.str()
<< " TRACKED: " << d.DisplayEpollWatch());
return total;
}
// Don't report any updates because this check happens
// extremely often.
}
if ((msTimeOut >= 0) && ((steady_clock::now() - entertime) >= microseconds_from(msTimeOut * int64_t(1000))))
{
HLOGC(ealog.Debug, log << "EID:" << d.m_iID << ": TIMEOUT.");
if (report_by_exception)
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
return 0; // meaning "none is ready"
}
CGlobEvent::waitForEvent();
}
return 0;
}
bool srt::CEPoll::empty(const CEPollDesc& d) const
{
ScopedLock lg (m_EPollLock);
return d.watch_empty();
}
int srt::CEPoll::release(const int eid)
{
ScopedLock pg(m_EPollLock);
map<int, CEPollDesc>::iterator i = m_mPolls.find(eid);
if (i == m_mPolls.end())
@ -659,7 +855,7 @@ int CEPoll::release(const int eid)
#ifdef LINUX
// release local/system epoll descriptor
::close(i->second.m_iLocalID);
#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
#elif defined(BSD) || TARGET_OS_MAC
::close(i->second.m_iLocalID);
#endif
@ -669,16 +865,29 @@ int CEPoll::release(const int eid)
}
int CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int events, const bool enable)
int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int events, const bool enable)
{
// As event flags no longer contain only event types, check now.
if ((events & ~SRT_EPOLL_EVENTTYPES) != 0)
{
LOGC(eilog.Fatal, log << "epoll/update: IPE: 'events' parameter shall not contain special flags!");
return -1; // still, ignored.
}
int nupdated = 0;
vector<int> lost;
CGuard pg(m_EPollLock);
IF_HEAVY_LOGGING(ostringstream debug);
IF_HEAVY_LOGGING(debug << "epoll/update: @" << uid << " " << (enable ? "+" : "-"));
IF_HEAVY_LOGGING(PrintEpollEvent(debug, events));
ScopedLock pg (m_EPollLock);
for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
{
map<int, CEPollDesc>::iterator p = m_mPolls.find(*i);
if (p == m_mPolls.end())
{
HLOGC(eilog.Note, log << "epoll/update: E" << *i << " was deleted in the meantime");
// EID invalid, though still present in the socket's subscriber list
// (dangling in the socket). Postpone to fix the subscruption and continue.
lost.push_back(*i);
@ -692,9 +901,12 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int e
if (!pwait)
{
// As this is mapped in the socket's data, it should be impossible.
LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
<< (*i) << " which is NOT SUBSCRIBED to @" << uid);
continue;
}
IF_HEAVY_LOGGING(string tracking = " TRACKING: " + ed.DisplayEpollWatch());
// compute new states
// New state to be set into the permanent state
@ -704,13 +916,21 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int e
// compute states changes!
int changes = pwait->state ^ newstate; // oldState XOR newState
if (!changes)
{
HLOGC(eilog.Debug, log << debug.str() << ": E" << (*i)
<< tracking << " NOT updated: no changes");
continue; // no changes!
}
// assign new state
pwait->state = newstate;
// filter change relating what is watching
changes &= pwait->watch;
if (!changes)
{
HLOGC(eilog.Debug, log << debug.str() << ": E" << (*i)
<< tracking << " NOT updated: not subscribed");
continue; // no change watching
}
// set events changes!
// This function will update the notice object associated with
@ -718,10 +938,75 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int e
// - if enable, it will set event flags, possibly in a new notice object
// - if !enable, it will clear event flags, possibly remove notice if resulted in 0
ed.updateEventNotice(*pwait, uid, events, enable);
++nupdated;
HLOGC(eilog.Debug, log << debug.str() << ": E" << (*i)
<< " TRACKING: " << ed.DisplayEpollWatch());
}
for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
eids.erase(*i);
return 0;
return nupdated;
}
// Debug use only.
#if ENABLE_HEAVY_LOGGING
namespace srt
{
static ostream& PrintEpollEvent(ostream& os, int events, int et_events)
{
static pair<int, const char*> const namemap [] = {
make_pair(SRT_EPOLL_IN, "R"),
make_pair(SRT_EPOLL_OUT, "W"),
make_pair(SRT_EPOLL_ERR, "E"),
make_pair(SRT_EPOLL_UPDATE, "U")
};
int N = Size(namemap);
for (int i = 0; i < N; ++i)
{
if (events & namemap[i].first)
{
os << "[";
if (et_events & namemap[i].first)
os << "^";
os << namemap[i].second << "]";
}
}
return os;
}
string DisplayEpollResults(const std::map<SRTSOCKET, int>& sockset)
{
typedef map<SRTSOCKET, int> fmap_t;
ostringstream os;
for (fmap_t::const_iterator i = sockset.begin(); i != sockset.end(); ++i)
{
os << "@" << i->first << ":";
PrintEpollEvent(os, i->second);
os << " ";
}
return os.str();
}
string CEPollDesc::DisplayEpollWatch()
{
ostringstream os;
for (ewatch_t::const_iterator i = m_USockWatchState.begin(); i != m_USockWatchState.end(); ++i)
{
os << "@" << i->first << ":";
PrintEpollEvent(os, i->second.watch, i->second.edge);
os << " ";
}
return os.str();
}
} // namespace srt
#endif