diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 3e8f686f8..0cf4e9bd4 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1334,7 +1334,6 @@ SrsRtcServer::SrsRtcServer() { timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); - mmstfd = NULL; waiting_msgs = false; cond = srs_cond_new(); trd = new SrsDummyCoroutine(); @@ -1415,6 +1414,9 @@ srs_error_t SrsRtcServer::listen_udp() return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } + // We will use all FDs to sendmmsg. + stfds.push_back(listener->stfd()); + srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd()); listeners.push_back(listener); } @@ -1642,8 +1644,6 @@ mmsghdr* SrsRtcServer::fetch() void SrsRtcServer::sendmmsg(srs_netfd_t stfd, mmsghdr* /*hdr*/) { - mmstfd = stfd; - if (waiting_msgs) { waiting_msgs = false; srs_cond_signal(cond); @@ -1670,12 +1670,15 @@ srs_error_t SrsRtcServer::cycle() uint64_t nn_msgs = 0; uint64_t nn_msgs_last = 0; - uint64_t nn_msgs_max = 0; - uint64_t nn_loop = 0; - uint64_t nn_wait = 0; + int nn_msgs_max = 0; + int nn_loop = 0; + int nn_wait = 0; srs_utime_t time_last = srs_get_system_time(); SrsStatistic* stat = SrsStatistic::instance(); + // We use FDs to send out messages, by round-trip algorithm. + uint32_t fd_index = 0; + SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_send(); SrsAutoFree(SrsPithyPrint, pprint); @@ -1698,12 +1701,12 @@ srs_error_t SrsRtcServer::cycle() cache.swap(hotspot); cache_pos = 0; - mmsghdr* p = &hotspot[0]; - mmsghdr* end = p + pos; - srs_netfd_t stfd = mmstfd; + srs_netfd_t stfd = NULL; + mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; for (; p < end; p += max_sendmmsg) { int vlen = (int)(end - p); vlen = srs_min(max_sendmmsg, vlen); + stfd = stfds.at((fd_index++) % stfds.size()); int r0 = srs_sendmmsg(stfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); if (r0 != vlen) { @@ -1719,6 +1722,7 @@ srs_error_t SrsRtcServer::cycle() pprint->elapse(); if (pprint->can_print()) { + // TODO: FIXME: Extract a PPS calculator. int pps_average = 0; int pps_last = 0; if (true) { if (srs_get_system_time() > srs_get_system_startup_time()) { @@ -1736,7 +1740,7 @@ srs_error_t SrsRtcServer::cycle() pps_unit = "(k)"; pps_last /= 10000; pps_average /= 10000; } - srs_trace("-> RTC #%d SEND %d, pps %d/%d%s, schedule %" PRId64 "/%" PRId64 "/%" PRId64 ", sessions %d by sendmmsg %d", + srs_trace("-> RTC #%d SEND %d, pps %d/%d%s, schedule %d/%d/%d, sessions %d by sendmmsg %d", srs_netfd_fileno(stfd), pos, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait, nn_msgs_max, (int)map_username_session.size(), max_sendmmsg); nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); nn_loop = nn_wait = nn_msgs_max = 0; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index e63f2109b..9eb518018 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -240,7 +240,7 @@ private: srs_cond_t cond; bool waiting_msgs; // TODO: FIXME: Support multiple stfd. - srs_netfd_t mmstfd; + std::vector stfds; // Hotspot msgs, we are working on it. // @remark We will wait util all messages are ready. std::vector hotspot;