diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e8963a213..f21fd247b 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -606,21 +606,9 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int srs_freep(msg); } - - if (!mhdrs.empty()) { - mmsghdr* msgvec = &mhdrs[0]; - unsigned int vlen = (unsigned int)mhdrs.size(); - int r0 = srs_sendmmsg(udp_mux_skt->stfd(), msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT); - if (r0 != (int)vlen) { - srs_warn("sendmsg %d msgs, %d done", vlen, r0); - } - } - for (int i = 0; i < (int)mhdrs.size(); i++) { - msghdr* hdr = &mhdrs[i].msg_hdr; - for (int i = 0; i < (int)hdr->msg_iovlen; i++) { - iovec* iov = hdr->msg_iov + i; - delete (char*)iov->iov_base; - } + if ((err = rtc_session->rtc_server->send_and_free_messages(udp_mux_skt->stfd(), mhdrs)) != srs_success) { + srs_warn("sendmsg %d msgs, err %s", mhdrs.size(), srs_error_summary(err).c_str()); + srs_freep(err); } } @@ -1072,12 +1060,21 @@ SrsRtcServer::SrsRtcServer() { listener = NULL; timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); + + mmstfd = NULL; + waiting_msgs = false; + cond = srs_cond_new(); + trd = new SrsDummyCoroutine(); } SrsRtcServer::~SrsRtcServer() { srs_freep(listener); srs_freep(timer); + + srs_freep(trd); + srs_cond_destroy(cond); + clear(); } srs_error_t SrsRtcServer::initialize() @@ -1092,6 +1089,12 @@ srs_error_t SrsRtcServer::initialize() return srs_error_wrap(err, "start timer"); } + srs_freep(trd); + trd = new SrsSTCoroutine("udp", this); + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "start coroutine"); + } + return err; } @@ -1304,6 +1307,80 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return srs_success; } +srs_error_t SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, const vector& msgs) +{ + srs_error_t err = srs_success; + + mmstfd = stfd; + mmhdrs.insert(mmhdrs.end(), msgs.begin(), msgs.end()); + + if (waiting_msgs) { + waiting_msgs = false; + srs_cond_signal(cond); + } + + return err; +} + +void SrsRtcServer::clear() +{ + for (int i = 0; i < (int)mmhdrs.size(); i++) { + msghdr* hdr = &mmhdrs[i].msg_hdr; + for (int i = 0; i < (int)hdr->msg_iovlen; i++) { + iovec* iov = hdr->msg_iov + i; + delete (char*)iov->iov_base; + } + } + + mmhdrs.clear(); +} + +srs_error_t SrsRtcServer::cycle() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Use pithy print. + uint32_t cnt = 1; + + while (true) { + if ((err = trd->pull()) != srs_success) { + return err; + } + + // TODO: FIXME: Use cond trigger. + if (mmhdrs.empty()) { + waiting_msgs = true; + srs_cond_wait(cond); + } + + vector mhdrs = mmhdrs; + mmhdrs.clear(); + + // TODO: FIXME: Use pithy print. + if ((cnt++ % 1000) == 0) { + srs_trace("SEND %d msgs by sendmmsg", mhdrs.size()); + } + + if (!mhdrs.empty()) { + mmsghdr* msgvec = &mhdrs[0]; + unsigned int vlen = (unsigned int)mhdrs.size(); + int r0 = srs_sendmmsg(mmstfd, msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT); + if (r0 != (int)vlen) { + srs_warn("sendmsg %d msgs, %d done", vlen, r0); + } + } + for (int i = 0; i < (int)mhdrs.size(); i++) { + msghdr* hdr = &mhdrs[i].msg_hdr; + for (int i = 0; i < (int)hdr->msg_iovlen; i++) { + iovec* iov = hdr->msg_iov + i; + delete (char*)iov->iov_base; + } + } + } + + return err; +} + RtcServerAdapter::RtcServerAdapter() { rtc = new SrsRtcServer(); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 49c054174..8746653d1 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -205,11 +206,18 @@ private: srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); }; -class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass +class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, virtual public ISrsCoroutineHandler { private: SrsUdpMuxListener* listener; SrsHourGlass* timer; +private: + SrsCoroutine* trd; + srs_cond_t cond; + bool waiting_msgs; + // TODO: FIXME: Support multiple stfd. + srs_netfd_t mmstfd; + std::vector mmhdrs; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map map_id_session; // key: peerip(ip + ":" + port) @@ -238,6 +246,11 @@ private: // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); +// Internal only. +public: + srs_error_t send_and_free_messages(srs_netfd_t stfd, const std::vector& msgs); + void clear(); + virtual srs_error_t cycle(); }; // The RTC server adapter.