1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #307, use sendmmsg in rtc server

This commit is contained in:
winlin 2020-04-05 16:53:08 +08:00
parent 737dcdd2c8
commit 133e3ce075
2 changed files with 106 additions and 16 deletions

View file

@ -606,21 +606,9 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int
srs_freep(msg); srs_freep(msg);
} }
if ((err = rtc_session->rtc_server->send_and_free_messages(udp_mux_skt->stfd(), mhdrs)) != srs_success) {
if (!mhdrs.empty()) { srs_warn("sendmsg %d msgs, err %s", mhdrs.size(), srs_error_summary(err).c_str());
mmsghdr* msgvec = &mhdrs[0]; srs_freep(err);
unsigned int vlen = (unsigned int)mhdrs.size();
int r0 = srs_sendmmsg(udp_mux_skt->stfd(), msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != (int)vlen) {
srs_warn("sendmsg %d msgs, %d done", vlen, r0);
}
}
for (int i = 0; i < (int)mhdrs.size(); i++) {
msghdr* hdr = &mhdrs[i].msg_hdr;
for (int i = 0; i < (int)hdr->msg_iovlen; i++) {
iovec* iov = hdr->msg_iov + i;
delete (char*)iov->iov_base;
}
} }
} }
@ -1072,12 +1060,21 @@ SrsRtcServer::SrsRtcServer()
{ {
listener = NULL; listener = NULL;
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
mmstfd = NULL;
waiting_msgs = false;
cond = srs_cond_new();
trd = new SrsDummyCoroutine();
} }
SrsRtcServer::~SrsRtcServer() SrsRtcServer::~SrsRtcServer()
{ {
srs_freep(listener); srs_freep(listener);
srs_freep(timer); srs_freep(timer);
srs_freep(trd);
srs_cond_destroy(cond);
clear();
} }
srs_error_t SrsRtcServer::initialize() srs_error_t SrsRtcServer::initialize()
@ -1092,6 +1089,12 @@ srs_error_t SrsRtcServer::initialize()
return srs_error_wrap(err, "start timer"); return srs_error_wrap(err, "start timer");
} }
srs_freep(trd);
trd = new SrsSTCoroutine("udp", this);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start coroutine");
}
return err; return err;
} }
@ -1304,6 +1307,80 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
return srs_success; return srs_success;
} }
srs_error_t SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, const vector<mmsghdr>& msgs)
{
srs_error_t err = srs_success;
mmstfd = stfd;
mmhdrs.insert(mmhdrs.end(), msgs.begin(), msgs.end());
if (waiting_msgs) {
waiting_msgs = false;
srs_cond_signal(cond);
}
return err;
}
void SrsRtcServer::clear()
{
for (int i = 0; i < (int)mmhdrs.size(); i++) {
msghdr* hdr = &mmhdrs[i].msg_hdr;
for (int i = 0; i < (int)hdr->msg_iovlen; i++) {
iovec* iov = hdr->msg_iov + i;
delete (char*)iov->iov_base;
}
}
mmhdrs.clear();
}
srs_error_t SrsRtcServer::cycle()
{
srs_error_t err = srs_success;
// TODO: FIXME: Use pithy print.
uint32_t cnt = 1;
while (true) {
if ((err = trd->pull()) != srs_success) {
return err;
}
// TODO: FIXME: Use cond trigger.
if (mmhdrs.empty()) {
waiting_msgs = true;
srs_cond_wait(cond);
}
vector<mmsghdr> mhdrs = mmhdrs;
mmhdrs.clear();
// TODO: FIXME: Use pithy print.
if ((cnt++ % 1000) == 0) {
srs_trace("SEND %d msgs by sendmmsg", mhdrs.size());
}
if (!mhdrs.empty()) {
mmsghdr* msgvec = &mhdrs[0];
unsigned int vlen = (unsigned int)mhdrs.size();
int r0 = srs_sendmmsg(mmstfd, msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != (int)vlen) {
srs_warn("sendmsg %d msgs, %d done", vlen, r0);
}
}
for (int i = 0; i < (int)mhdrs.size(); i++) {
msghdr* hdr = &mhdrs[i].msg_hdr;
for (int i = 0; i < (int)hdr->msg_iovlen; i++) {
iovec* iov = hdr->msg_iov + i;
delete (char*)iov->iov_base;
}
}
}
return err;
}
RtcServerAdapter::RtcServerAdapter() RtcServerAdapter::RtcServerAdapter()
{ {
rtc = new SrsRtcServer(); rtc = new SrsRtcServer();

View file

@ -36,6 +36,7 @@
#include <string> #include <string>
#include <map> #include <map>
#include <vector> #include <vector>
#include <sys/socket.h>
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <srtp2/srtp.h> #include <srtp2/srtp.h>
@ -205,11 +206,18 @@ private:
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
}; };
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, virtual public ISrsCoroutineHandler
{ {
private: private:
SrsUdpMuxListener* listener; SrsUdpMuxListener* listener;
SrsHourGlass* timer; SrsHourGlass* timer;
private:
SrsCoroutine* trd;
srs_cond_t cond;
bool waiting_msgs;
// TODO: FIXME: Support multiple stfd.
srs_netfd_t mmstfd;
std::vector<mmsghdr> mmhdrs;
private: private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port) std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
@ -238,6 +246,11 @@ private:
// interface ISrsHourGlass // interface ISrsHourGlass
public: public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
// Internal only.
public:
srs_error_t send_and_free_messages(srs_netfd_t stfd, const std::vector<mmsghdr>& msgs);
void clear();
virtual srs_error_t cycle();
}; };
// The RTC server adapter. // The RTC server adapter.