1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SRT: Extract ISrsSrtPoller to hide SRT_EPOLL_EVENT

This commit is contained in:
winlin 2022-06-07 20:40:31 +08:00
parent 94cc50d146
commit d39ec3cf45
5 changed files with 47 additions and 20 deletions

View file

@ -359,7 +359,7 @@ srs_error_t SrsSrtEventLoop::initialize()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
srt_poller_ = new SrsSrtPoller(); srt_poller_ = srs_srt_poller_new();
if ((err = srt_poller_->initialize()) != srs_success) { if ((err = srt_poller_->initialize()) != srs_success) {
return srs_error_wrap(err, "srt poller initialize"); return srs_error_wrap(err, "srt poller initialize");

View file

@ -111,7 +111,7 @@ public:
SrsSrtEventLoop(); SrsSrtEventLoop();
virtual ~SrsSrtEventLoop(); virtual ~SrsSrtEventLoop();
public: public:
SrsSrtPoller* poller() { return srt_poller_; } ISrsSrtPoller* poller() { return srt_poller_; }
public: public:
srs_error_t initialize(); srs_error_t initialize();
srs_error_t start(); srs_error_t start();
@ -119,7 +119,7 @@ public:
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
private: private:
SrsSrtPoller* srt_poller_; ISrsSrtPoller* srt_poller_;
SrsCoroutine* trd_; SrsCoroutine* trd_;
}; };

View file

@ -439,6 +439,24 @@ srs_error_t SrsSrtStat::fetch(SRTSOCKET srt_fd, bool clear)
return err; return err;
} }
class SrsSrtPoller : public ISrsSrtPoller
{
public:
SrsSrtPoller();
virtual ~SrsSrtPoller();
public:
srs_error_t initialize();
srs_error_t add_socket(SrsSrtSocket* srt_skt);
srs_error_t mod_socket(SrsSrtSocket* srt_skt);
srs_error_t del_socket(SrsSrtSocket* srt_skt);
srs_error_t wait(int timeout_ms, int* pn_fds);
private:
// Find SrsSrtSocket* context by SRTSOCKET.
std::map<SRTSOCKET, SrsSrtSocket*> fd_sockets_;
int srt_epoller_fd_;
std::vector<SRT_EPOLL_EVENT> events_;
};
SrsSrtPoller::SrsSrtPoller() SrsSrtPoller::SrsSrtPoller()
{ {
srt_epoller_fd_ = -1; srt_epoller_fd_ = -1;
@ -556,7 +574,20 @@ srs_error_t SrsSrtPoller::mod_socket(SrsSrtSocket* srt_skt)
return err; return err;
} }
SrsSrtSocket::SrsSrtSocket(SrsSrtPoller* srt_poller, SRTSOCKET srt_fd) ISrsSrtPoller::ISrsSrtPoller()
{
}
ISrsSrtPoller::~ISrsSrtPoller()
{
}
ISrsSrtPoller* srs_srt_poller_new()
{
return new SrsSrtPoller();
}
SrsSrtSocket::SrsSrtSocket(ISrsSrtPoller* srt_poller, SRTSOCKET srt_fd)
{ {
srt_poller_ = srt_poller; srt_poller_ = srt_poller;
srt_fd_ = srt_fd; srt_fd_ = srt_fd;

View file

@ -86,31 +86,27 @@ public:
}; };
// Srt poller, subscribe/unsubscribed events and wait them fired. // Srt poller, subscribe/unsubscribed events and wait them fired.
class SrsSrtPoller class ISrsSrtPoller
{ {
public: public:
SrsSrtPoller(); ISrsSrtPoller();
virtual ~SrsSrtPoller(); virtual ~ISrsSrtPoller();
public: public:
srs_error_t initialize(); virtual srs_error_t initialize() = 0;
srs_error_t add_socket(SrsSrtSocket* srt_skt); virtual srs_error_t add_socket(SrsSrtSocket* srt_skt) = 0;
srs_error_t mod_socket(SrsSrtSocket* srt_skt); virtual srs_error_t mod_socket(SrsSrtSocket* srt_skt) = 0;
srs_error_t del_socket(SrsSrtSocket* srt_skt); virtual srs_error_t del_socket(SrsSrtSocket* srt_skt) = 0;
// Wait for the fds in its epoll to be fired in specified timeout_ms, where the pn_fds is the number of active fds. // Wait for the fds in its epoll to be fired in specified timeout_ms, where the pn_fds is the number of active fds.
// Note that for ST, please always use timeout_ms(0) and switch coroutine by yourself. // Note that for ST, please always use timeout_ms(0) and switch coroutine by yourself.
srs_error_t wait(int timeout_ms, int* pn_fds); virtual srs_error_t wait(int timeout_ms, int* pn_fds) = 0;
private:
// Find SrsSrtSocket* context by SRTSOCKET.
std::map<SRTSOCKET, SrsSrtSocket*> fd_sockets_;
int srt_epoller_fd_;
std::vector<SRT_EPOLL_EVENT> events_;
}; };
ISrsSrtPoller* srs_srt_poller_new();
// Srt ST socket, wrap SRT io and make it adapt to ST-thread. // Srt ST socket, wrap SRT io and make it adapt to ST-thread.
class SrsSrtSocket class SrsSrtSocket
{ {
public: public:
SrsSrtSocket(SrsSrtPoller* srt_poller, SRTSOCKET srt_fd); SrsSrtSocket(ISrsSrtPoller* srt_poller, SRTSOCKET srt_fd);
virtual ~SrsSrtSocket(); virtual ~SrsSrtSocket();
public: // IO API public: // IO API
srs_error_t connect(const std::string& ip, int port); srs_error_t connect(const std::string& ip, int port);
@ -171,7 +167,7 @@ private:
// Event of this socket subscribed. // Event of this socket subscribed.
int events_; int events_;
// Srt poller which this socket attach to. // Srt poller which this socket attach to.
SrsSrtPoller* srt_poller_; ISrsSrtPoller* srt_poller_;
}; };
#endif #endif

View file

@ -23,7 +23,7 @@ VOID TEST(ServiceSrtPoller, SrtPollOperateSocket)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsSrtPoller* srt_poller = new SrsSrtPoller(); ISrsSrtPoller* srt_poller = srs_srt_poller_new();
HELPER_EXPECT_SUCCESS(srt_poller->initialize()); HELPER_EXPECT_SUCCESS(srt_poller->initialize());
SRTSOCKET srt_fd = SRT_INVALID_SOCK; SRTSOCKET srt_fd = SRT_INVALID_SOCK;