From 6decdc7838e9f9a941e93611c206c55759fd3277 Mon Sep 17 00:00:00 2001 From: xiaozhihong Date: Fri, 13 Mar 2020 00:24:56 +0800 Subject: [PATCH] adjust code style, fix some bug, add rtc session timeout --- trunk/src/app/srs_app_dtls.cpp | 6 + trunk/src/app/srs_app_listener.cpp | 97 +++++- trunk/src/app/srs_app_listener.hpp | 30 +- trunk/src/app/srs_app_rtc_conn.cpp | 494 +++++++++++++++++---------- trunk/src/app/srs_app_rtc_conn.hpp | 111 ++++-- trunk/src/app/srs_app_rtp.cpp | 5 +- trunk/src/app/srs_app_server.cpp | 6 +- trunk/src/app/srs_app_server.hpp | 4 +- trunk/src/service/srs_service_st.cpp | 5 + trunk/src/service/srs_service_st.hpp | 1 + 10 files changed, 508 insertions(+), 251 deletions(-) diff --git a/trunk/src/app/srs_app_dtls.cpp b/trunk/src/app/srs_app_dtls.cpp index af09dcb2f..4a863fd92 100644 --- a/trunk/src/app/srs_app_dtls.cpp +++ b/trunk/src/app/srs_app_dtls.cpp @@ -29,6 +29,8 @@ using namespace std; #include +#include + SrsDtls* SrsDtls::_instance = NULL; SrsDtls::SrsDtls() @@ -50,6 +52,10 @@ SrsDtls* SrsDtls::instance() void SrsDtls::init() { + // srtp init first + srs_assert(srtp_init() == 0); + + // init dtls context EVP_PKEY* dtls_private_key = EVP_PKEY_new(); srs_assert(dtls_private_key); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index c8ecec2c6..0ba7684cd 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -60,15 +60,15 @@ srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) return srs_success; } -ISrsUdpRemuxHandler::ISrsUdpRemuxHandler() +ISrsUdpMuxHandler::ISrsUdpMuxHandler() { } -ISrsUdpRemuxHandler::~ISrsUdpRemuxHandler() +ISrsUdpMuxHandler::~ISrsUdpMuxHandler() { } -srs_error_t ISrsUdpRemuxHandler::on_stfd_change(srs_netfd_t /*fd*/) +srs_error_t ISrsUdpMuxHandler::on_stfd_change(srs_netfd_t /*fd*/) { return srs_success; } @@ -221,7 +221,7 @@ srs_error_t SrsTcpListener::cycle() return err; } -SrsUdpRemuxSocket::SrsUdpRemuxSocket(srs_netfd_t fd) +SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) { nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; @@ -232,12 +232,31 @@ SrsUdpRemuxSocket::SrsUdpRemuxSocket(srs_netfd_t fd) fromlen = 0; } -SrsUdpRemuxSocket::~SrsUdpRemuxSocket() +SrsUdpMuxSocket::~SrsUdpMuxSocket() { srs_freepa(buf); } -int SrsUdpRemuxSocket::recvfrom(srs_utime_t timeout) +SrsUdpMuxSocket::SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs) +{ + operator=(rhs); +} + +SrsUdpMuxSocket& SrsUdpMuxSocket::operator=(const SrsUdpMuxSocket& rhs) +{ + buf = NULL; + nb_buf = 0; + nread = 0; + lfd = rhs.lfd; + from = rhs.from; + fromlen = rhs.fromlen; + peer_ip = rhs.peer_ip; + peer_port = rhs.peer_port; + + return *this; +} + +int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout) { fromlen = sizeof(from); nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout); @@ -259,12 +278,23 @@ int SrsUdpRemuxSocket::recvfrom(srs_utime_t timeout) return nread; } -int SrsUdpRemuxSocket::sendto(void* data, int size, srs_utime_t timeout) +int SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout) { return srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout); } -std::string SrsUdpRemuxSocket::get_peer_id() +int SrsUdpMuxSocket::sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout) +{ + struct msghdr udphdr = {0}; + udphdr.msg_name = &from; + udphdr.msg_namelen = fromlen; + udphdr.msg_iov = iov; + udphdr.msg_iovlen = iovlen; + + return srs_sendmsg(lfd, &udphdr, 0, timeout); +} + +std::string SrsUdpMuxSocket::get_peer_id() { char id_buf[1024]; int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port); @@ -272,7 +302,7 @@ std::string SrsUdpRemuxSocket::get_peer_id() return string(id_buf, len); } -SrsUdpRemuxListener::SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i, int p) +SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p) { handler = h; ip = i; @@ -285,30 +315,32 @@ SrsUdpRemuxListener::SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i, trd = new SrsDummyCoroutine(); } -SrsUdpRemuxListener::~SrsUdpRemuxListener() +SrsUdpMuxListener::~SrsUdpMuxListener() { srs_freep(trd); srs_close_stfd(lfd); srs_freepa(buf); } -int SrsUdpRemuxListener::fd() +int SrsUdpMuxListener::fd() { return srs_netfd_fileno(lfd); } -srs_netfd_t SrsUdpRemuxListener::stfd() +srs_netfd_t SrsUdpMuxListener::stfd() { return lfd; } -srs_error_t SrsUdpRemuxListener::listen() +srs_error_t SrsUdpMuxListener::listen() { srs_error_t err = srs_success; if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) { return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } + + set_socket_buffer(); srs_freep(trd); trd = new SrsSTCoroutine("udp", this); @@ -319,7 +351,38 @@ srs_error_t SrsUdpRemuxListener::listen() return err; } -srs_error_t SrsUdpRemuxListener::cycle() +void SrsUdpMuxListener::set_socket_buffer() +{ + int sndbuf_size = 0; + socklen_t opt_len = sizeof(sndbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len); + srs_trace("default udp remux socket sndbuf=%d", sndbuf_size); + + sndbuf_size = 1024*1024*10; // 10M + if (setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, sizeof(sndbuf_size)) < 0) { + srs_warn("set sock opt SO_SNDBUFFORCE failed"); + } + + opt_len = sizeof(sndbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len); + srs_trace("udp remux socket sndbuf=%d", sndbuf_size); + + int rcvbuf_size = 0; + opt_len = sizeof(rcvbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len); + srs_trace("default udp remux socket rcvbuf=%d", rcvbuf_size); + + rcvbuf_size = 1024*1024*10; // 10M + if (setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, sizeof(rcvbuf_size)) < 0) { + srs_warn("set sock opt SO_RCVBUFFORCE failed"); + } + + opt_len = sizeof(rcvbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len); + srs_trace("udp remux socket rcvbuf=%d", rcvbuf_size); +} + +srs_error_t SrsUdpMuxListener::cycle() { srs_error_t err = srs_success; @@ -328,15 +391,15 @@ srs_error_t SrsUdpRemuxListener::cycle() return srs_error_wrap(err, "udp listener"); } - SrsUdpRemuxSocket udp_remux_socket(lfd); + SrsUdpMuxSocket udp_mux_skt(lfd); - if (udp_remux_socket.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) { + if (udp_mux_skt.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) { srs_error("udp recv error"); // remux udp never return continue; } - if ((err = handler->on_udp_packet(&udp_remux_socket)) != srs_success) { + if ((err = handler->on_udp_packet(&udp_mux_skt)) != srs_success) { // remux udp never return srs_error("udp packet handler error:%s", srs_error_desc(err).c_str()); continue; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index fc25551e8..605929902 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -35,7 +35,7 @@ struct sockaddr; -class SrsUdpRemuxSocket; +class SrsUdpMuxSocket; // The udp packet handler. class ISrsUdpHandler @@ -58,14 +58,14 @@ public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0; }; -class ISrsUdpRemuxHandler +class ISrsUdpMuxHandler { public: - ISrsUdpRemuxHandler(); - virtual ~ISrsUdpRemuxHandler(); + ISrsUdpMuxHandler(); + virtual ~ISrsUdpMuxHandler(); public: virtual srs_error_t on_stfd_change(srs_netfd_t fd); - virtual srs_error_t on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) = 0; + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) = 0; }; // The tcp connection handler. @@ -127,7 +127,7 @@ public: virtual srs_error_t cycle(); }; -class SrsUdpRemuxSocket +class SrsUdpMuxSocket { private: char* buf; @@ -139,11 +139,15 @@ private: std::string peer_ip; int peer_port; public: - SrsUdpRemuxSocket(srs_netfd_t fd); - virtual ~SrsUdpRemuxSocket(); + SrsUdpMuxSocket(srs_netfd_t fd); + virtual ~SrsUdpMuxSocket(); + + SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs); + SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs); int recvfrom(srs_utime_t timeout); int sendto(void* data, int size, srs_utime_t timeout); + int sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout); char* data() { return buf; } int size() { return nread; } @@ -152,7 +156,7 @@ public: std::string get_peer_id(); }; -class SrsUdpRemuxListener : public ISrsCoroutineHandler +class SrsUdpMuxListener : public ISrsCoroutineHandler { protected: srs_netfd_t lfd; @@ -161,12 +165,12 @@ protected: char* buf; int nb_buf; protected: - ISrsUdpRemuxHandler* handler; + ISrsUdpMuxHandler* handler; std::string ip; int port; public: - SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i, int p); - virtual ~SrsUdpRemuxListener(); + SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p); + virtual ~SrsUdpMuxListener(); public: virtual int fd(); virtual srs_netfd_t stfd(); @@ -175,6 +179,8 @@ public: // Interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); +private: + void set_socket_buffer(); }; #endif diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index a92a2b5f8..07d0fc900 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -65,6 +65,11 @@ static bool is_rtp_or_rtcp(const char* data, size_t len) return (len >= 12 && (data[0] & 0xC0) == 0x80); } +static bool is_rtcp(const char* data, size_t len) +{ + return (len >=12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); +} + static string gen_random_str(int len) { static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; @@ -294,9 +299,22 @@ SrsDtlsSession::SrsDtlsSession(SrsRtcSession* s) SrsDtlsSession::~SrsDtlsSession() { + if (dtls) { + // this function will free bio_in and bio_out + SSL_free(dtls); + dtls = NULL; + } + + if (srtp_send) { + srtp_dealloc(srtp_send); + } + + if (srtp_recv) { + srtp_dealloc(srtp_recv); + } } -srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; @@ -308,7 +326,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket) int ssl_err = SSL_get_error(dtls, ret); switch(ssl_err) { case SSL_ERROR_NONE: { - err = on_dtls_handshake_done(udp_remux_socket); + err = on_dtls_handshake_done(udp_mux_skt); } break; @@ -327,25 +345,25 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket) if (out_bio_len) { srs_trace("send dtls handshake data"); - udp_remux_socket->sendto(out_bio_data, out_bio_len, 0); + udp_mux_skt->sendto(out_bio_data, out_bio_len, 0); } return err; } -srs_error_t SrsDtlsSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; if (! handshake_done) { BIO_reset(bio_in); BIO_reset(bio_out); - BIO_write(bio_in, udp_remux_socket->data(), udp_remux_socket->size()); + BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size()); - handshake(udp_remux_socket); + handshake(udp_mux_skt); } else { BIO_reset(bio_in); BIO_reset(bio_out); - BIO_write(bio_in, udp_remux_socket->data(), udp_remux_socket->size()); + BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size()); while (BIO_ctrl_pending(bio_in) > 0) { char dtls_read_buf[8092]; @@ -360,7 +378,7 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) return err; } -srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; srs_trace("dtls handshake done"); @@ -371,7 +389,7 @@ srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_ return srs_error_wrap(err, "srtp init failed"); } - rtc_session->on_connection_established(udp_remux_socket); + rtc_session->on_connection_established(udp_mux_skt); return err; } @@ -383,7 +401,7 @@ srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int return err; } -void SrsDtlsSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) +void SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) { if (dtls == NULL) { srs_trace("send client hello"); @@ -396,7 +414,7 @@ void SrsDtlsSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) SSL_set_bio(dtls, bio_in, bio_out); - handshake(udp_remux_socket); + handshake(udp_mux_skt); } } @@ -405,42 +423,36 @@ srs_error_t SrsDtlsSession::srtp_initialize() srs_error_t err = srs_success; unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server - static string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; + static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { return srs_error_wrap(err, "SSL_export_keying_material failed"); } size_t offset = 0; - std::string sClientMasterKey(reinterpret_cast(material), SRTP_MASTER_KEY_KEY_LEN); + std::string client_master_key(reinterpret_cast(material), SRTP_MASTER_KEY_KEY_LEN); offset += SRTP_MASTER_KEY_KEY_LEN; - std::string sServerMasterKey(reinterpret_cast(material + offset), SRTP_MASTER_KEY_KEY_LEN); + std::string server_master_key(reinterpret_cast(material + offset), SRTP_MASTER_KEY_KEY_LEN); offset += SRTP_MASTER_KEY_KEY_LEN; - std::string sClientMasterSalt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); + std::string client_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); offset += SRTP_MASTER_KEY_SALT_LEN; - std::string sServerMasterSalt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); + std::string server_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); - client_key = sClientMasterKey + sClientMasterSalt; - server_key = sServerMasterKey + sServerMasterSalt; + client_key = client_master_key + client_master_salt; + server_key = server_master_key + server_master_salt; - srs_trace("client_key size=%d, server_key=%d", client_key.size(), server_key.size()); - - if (srtp_init() != 0) { - return srs_error_wrap(err, "srtp init failed"); + if (srtp_send_init() != srs_success) { + return srs_error_wrap(err, "srtp send init failed"); } - if (srtp_sender_side_init() != srs_success) { - return srs_error_wrap(err, "srtp sender size init failed"); - } - - if (srtp_receiver_side_init() != srs_success) { - return srs_error_wrap(err, "srtp receiver size init failed"); + if (srtp_recv_init() != srs_success) { + return srs_error_wrap(err, "srtp recv init failed"); } return err; } -srs_error_t SrsDtlsSession::srtp_sender_side_init() +srs_error_t SrsDtlsSession::srtp_send_init() { srs_error_t err = srs_success; @@ -463,16 +475,16 @@ srs_error_t SrsDtlsSession::srtp_sender_side_init() policy.key = key; if (srtp_create(&srtp_send, &policy) != 0) { - delete [] key; + srs_freepa(key); return srs_error_wrap(err, "srtp_create failed"); } - delete [] key; + srs_freepa(key); return err; } -srs_error_t SrsDtlsSession::srtp_receiver_side_init() +srs_error_t SrsDtlsSession::srtp_recv_init() { srs_error_t err = srs_success; @@ -495,50 +507,80 @@ srs_error_t SrsDtlsSession::srtp_receiver_side_init() policy.key = key; if (srtp_create(&srtp_recv, &policy) != 0) { - delete [] key; + srs_freepa(key); return srs_error_wrap(err, "srtp_create failed"); } - delete [] key; + srs_freepa(key); return err; } -srs_error_t SrsDtlsSession::srtp_sender_protect(char* protected_buf, const char* ori_buf, int& nb_protected_buf) +srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf) { srs_error_t err = srs_success; if (srtp_send) { - memcpy(protected_buf, ori_buf, nb_protected_buf); - if (srtp_protect(srtp_send, protected_buf, &nb_protected_buf) != 0) { - srs_error("srtp sender protect failed"); - return srs_error_wrap(err, "srtp sender protect failed"); + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_protect(srtp_send, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtp protect failed"); } return err; } - return srs_error_wrap(err, "srtp sender protect failed"); + return srs_error_wrap(err, "rtp protect failed"); } -srs_error_t SrsDtlsSession::srtp_receiver_unprotect(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf) +srs_error_t SrsDtlsSession::unprotect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf) { srs_error_t err = srs_success; - if (srtp_send) { - memcpy(unprotected_buf, ori_buf, nb_unprotected_buf); - if (srtp_unprotect(srtp_recv, unprotected_buf, &nb_unprotected_buf) != 0) { - srs_error("srtp receiver unprotect failed"); - return srs_error_wrap(err, "srtp receiver unprotect failed"); + if (srtp_recv) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_unprotect(srtp_recv, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtp unprotect failed"); } return err; } - return srs_error_wrap(err, "srtp receiver unprotect failed"); + return srs_error_wrap(err, "rtp unprotect failed"); } -SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpRemuxSocket* u, int parent_cid) +srs_error_t SrsDtlsSession::protect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf) +{ + srs_error_t err = srs_success; + + if (srtp_send) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_protect_rtcp(srtp_send, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtcp protect failed"); + } + + return err; + } + + return srs_error_wrap(err, "rtcp protect failed"); +} + +srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf) +{ + srs_error_t err = srs_success; + + if (srtp_recv) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_unprotect_rtcp(srtp_recv, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtcp unprotect failed"); + } + + return err; + } + + return srs_error_wrap(err, "rtcp unprotect failed"); +} + +SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) : ukt(NULL) { _parent_cid = parent_cid; @@ -563,10 +605,10 @@ srs_error_t SrsRtcSenderThread::start() srs_error_t err = srs_success; srs_freep(trd); - trd = new SrsSTCoroutine("recv", this, _parent_cid); + trd = new SrsSTCoroutine("rtc_sender", this, _parent_cid); if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "recv thread"); + return srs_error_wrap(err, "rtc_sender"); } return err; @@ -608,69 +650,91 @@ srs_error_t SrsRtcSenderThread::cycle() SrsAutoFree(SrsConsumer, consumer); while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "rtc sender thread"); + } + SrsMessageArray msgs(SRS_PERF_MW_MSGS); +#ifdef SRS_PERF_QUEUE_COND_WAIT + consumer->wait(0, SRS_PERF_MW_SLEEP); +#endif + int msg_count = 0; if (consumer->dump_packets(&msgs, msg_count) != srs_success) { - srs_trace("rtc pop no rtp packets"); continue; } - srs_trace("rtc pop %d rtp packets", msg_count); - - for (int i = 0; i < msg_count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - - for (int i = 0; i < msg->nb_rtp_fragments; ++i) { - SrsBuffer stream(msg->rtp_fragments[i].bytes + 2, 2); - uint16_t seq = stream.read_2bytes(); - srs_trace("rtp fragment size=%d, seq=%u, payload=%s", msg->rtp_fragments[i].size, seq, - dump_string_hex(msg->rtp_fragments[i].bytes, msg->rtp_fragments[i].size, 1460).c_str()); - - if (rtc_session->dtls_session) { - char rtp_send_protected_buf[1500]; - int rtp_send_protected_len = msg->rtp_fragments[i].size; - rtc_session->dtls_session->srtp_sender_protect(rtp_send_protected_buf, msg->rtp_fragments[i].bytes, rtp_send_protected_len); - ukt.sendto(rtp_send_protected_buf, rtp_send_protected_len, 0); - } - } - - srs_freep(msg); + if (msg_count <= 0) { +#ifndef SRS_PERF_QUEUE_COND_WAIT + srs_usleep(mw_sleep); +#endif + // ignore when nothing got. + continue; } - srs_usleep(16000); + send_and_free_messages(msgs.msgs, msg_count, &ukt); } } +void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt) +{ + for (int i = 0; i < nb_msgs; i++) { + SrsSharedPtrMessage* msg = msgs[i]; -SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr) + for (int i = 0; i < msg->nb_rtp_fragments; ++i) { + if (rtc_session->dtls_session) { + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = msg->rtp_fragments[i].size; + + rtc_session->dtls_session->protect_rtp(protected_buf, msg->rtp_fragments[i].bytes, nb_protected_buf); + udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); + } + } + + srs_freep(msg); + } +} + +SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string& un) { server = svr; rtc_server = rtc_svr; session_state = INIT; dtls_session = NULL; - strd = NULL; + + username = un; + + last_stun_time = srs_get_system_time(); } SrsRtcSession::~SrsRtcSession() { + srs_freep(dtls_session); + + if (strd) { + strd->stop(); + } + srs_freep(strd); } -srs_error_t SrsRtcSession::on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req) +srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) { srs_error_t err = srs_success; if (stun_req->is_binding_request()) { - if (on_binding_request(udp_remux_socket, stun_req) != srs_success) { + if (on_binding_request(udp_mux_skt, stun_req) != srs_success) { return srs_error_wrap(err, "stun binding request failed"); } } + last_stun_time = srs_get_system_time(); + return err; } -srs_error_t SrsRtcSession::on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req) +srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) { srs_error_t err = srs_success; @@ -684,77 +748,74 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpRemuxSocket* udp_remux_socke stun_binding_response.set_remote_ufrag(stun_req->get_local_ufrag()); stun_binding_response.set_transcation_id(stun_req->get_transcation_id()); // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_remux_socket->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(udp_remux_socket->get_peer_port()); + stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port()); if (stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream) != srs_success) { return srs_error_wrap(err, "stun binding response encode failed"); } - if (udp_remux_socket->sendto(stream->data(), stream->pos(), 0) <= 0) { + if (udp_mux_skt->sendto(stream->data(), stream->pos(), 0) <= 0) { return srs_error_wrap(err, "stun binding response send failed"); } if (get_session_state() == WAITING_STUN) { set_session_state(DOING_DTLS_HANDSHAKE); - send_client_hello(udp_remux_socket); + send_client_hello(udp_mux_skt); - string peer_id = udp_remux_socket->get_peer_id(); + peer_id = udp_mux_skt->get_peer_id(); rtc_server->insert_into_id_sessions(peer_id, this); } - // TODO: dtls send client retry - return err; } -srs_error_t SrsRtcSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) { if (dtls_session == NULL) { dtls_session = new SrsDtlsSession(this); } - dtls_session->send_client_hello(udp_remux_socket); + dtls_session->send_client_hello(udp_mux_skt); } -void SrsRtcSession::on_connection_established(SrsUdpRemuxSocket* udp_remux_socket) +void SrsRtcSession::on_connection_established(SrsUdpMuxSocket* udp_mux_skt) { - start_play(udp_remux_socket); + srs_trace("rtc session=%s, connection established", id().c_str()); + start_play(udp_mux_skt); } -srs_error_t SrsRtcSession::start_play(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - strd = new SrsRtcSenderThread(this, udp_remux_socket, _srs_context->get_id()); + srs_freep(strd); + strd = new SrsRtcSenderThread(this, udp_mux_skt, _srs_context->get_id()); strd->start(); return err; } -srs_error_t SrsRtcSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) { - return dtls_session->on_dtls(udp_remux_socket); + return dtls_session->on_dtls(udp_mux_skt); } -srs_error_t SrsRtcSession::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; if (dtls_session == NULL) { - return srs_error_wrap(err, "recv unexpect rtp/rtcp packet before dtls done"); + return srs_error_wrap(err, "recv unexpect rtp packet before dtls done"); } - uint8_t payload_type = udp_remux_socket->data()[1] & 0x7F; - - char srtp_unprotect_buf[1460]; - int nb_srtp_unprotect_buf = udp_remux_socket->size(); - if (dtls_session->srtp_receiver_unprotect(srtp_unprotect_buf, udp_remux_socket->data(), nb_srtp_unprotect_buf) != srs_success) { - return srs_error_wrap(err, "srtp receiver unprotect failed, payload_type=%u", payload_type); + char unprotected_buf[1460]; + int nb_unprotected_buf = udp_mux_skt->size(); + if (dtls_session->unprotect_rtp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) { + return srs_error_wrap(err, "rtp unprotect failed"); } - //srs_trace("srtp unprotect success, %s", dump_string_hex(srtp_unprotect_buf, nb_srtp_unprotect_buf, nb_srtp_unprotect_buf).c_str()); - - SrsBuffer* stream = new SrsBuffer(srtp_unprotect_buf, nb_srtp_unprotect_buf); + // FIXME: use SrsRtpPacket + SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf); SrsAutoFree(SrsBuffer, stream); uint8_t first = stream->read_1bytes(); uint8_t second = stream->read_1bytes(); @@ -769,7 +830,7 @@ srs_error_t SrsRtcSession::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket) uint32_t timestamp = stream->read_4bytes(); uint32_t ssrc = stream->read_4bytes(); - srs_trace("sequence=%u, timestamp=%u, ssrc=%u, padding=%d, ext=%d, cc=%u, marker=%d, payload_type=%u", + srs_verbose("sequence=%u, timestamp=%u, ssrc=%u, padding=%d, ext=%d, cc=%u, marker=%d, payload_type=%u", sequence, timestamp, ssrc, padding, ext, cc, marker, payload_type); for (uint8_t i = 0; i < cc; ++i) { @@ -781,70 +842,40 @@ srs_error_t SrsRtcSession::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket) uint16_t extern_profile = stream->read_2bytes(); uint16_t extern_length = stream->read_2bytes(); - srs_trace("extern_profile=%u, extern_length=%u", extern_profile, extern_length); + srs_verbose("extern_profile=%u, extern_length=%u", extern_profile, extern_length); stream->read_string(extern_length * 4); } - if (payload_type == 102) { - static uint32_t pre_seq = 0; - uint32_t seq = sequence; - - srs_assert(pre_seq == 0 || (pre_seq + 1 == seq)); + return err; +} - pre_seq = seq; - - static uint8_t start_code[4] = {0x00, 0x00, 0x00, 0x01}; - static int fd = -1; - if (fd < 0) { - fd = open("rtc.264", O_CREAT|O_TRUNC|O_RDWR, 0664); - } - - const uint8_t* p = (const uint8_t*)stream->data() + stream->pos(); - int len = stream->left(); - uint8_t header = p[0]; - uint8_t nal_type = header & kNalTypeMask; - - srs_trace("nal_type=%u, seq=%u, rtp payload, %s", nal_type, sequence, dump_string_hex(stream->data() + stream->pos(), stream->left(), stream->left()).c_str()); - - if (nal_type >=1 && nal_type <= 23) { - srs_trace("single nalu"); - write(fd, start_code, sizeof(start_code)); - write(fd, p, len); - } else if (nal_type == kFuA) { - srs_trace("FuA"); - if (p[1] & 0x80) { - uint8_t nal_type = ((p[0] & (~kNalTypeMask)) | (p[1] & kNalTypeMask)); - write(fd, start_code, sizeof(start_code)); - write(fd, &nal_type, 1); - write(fd, p + 2, len - 2); - } else { - write(fd, p + 2, len - 2); - } - } else if (nal_type == kStapA) { - srs_trace("StapA"); - int pos = 1; - while (pos < len) { - int nal_len = p[pos] << 8 | p[pos + 1]; - srs_trace("nal_len=%d", nal_len); - write(fd, start_code, sizeof(start_code)); - write(fd, p + pos + 2, nal_len); - pos += nal_len + 2; - } - srs_assert(pos == len); - } else { - srs_assert(false); - } +srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + if (dtls_session == NULL) { + return srs_error_wrap(err, "recv unexpect rtcp packet before dtls done"); } - // XXX:send h264 back to client, for debug - if (payload_type == 102) { - char rtp_send_protected_buf[1500]; - int rtp_send_protected_len = nb_srtp_unprotect_buf; - SrsBuffer stream(srtp_unprotect_buf + 8, 4); - stream.write_4bytes(3233846889); - dtls_session->srtp_sender_protect(rtp_send_protected_buf, srtp_unprotect_buf, rtp_send_protected_len); - udp_remux_socket->sendto(rtp_send_protected_buf, rtp_send_protected_len, 0); + char unprotected_buf[1460]; + int nb_unprotected_buf = udp_mux_skt->size(); + if (dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) { + return srs_error_wrap(err, "rtcp unprotect failed"); + } + + // FIXME: use SrsRtpPacket + SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf); + SrsAutoFree(SrsBuffer, stream); + uint8_t first = stream->read_1bytes(); + uint8_t payload_type = stream->read_1bytes(); + + if (payload_type == kSR) { + } else if (payload_type == kRR) { + } else if (kSDES) { + } else if (kBye) { + } else if (kApp) { + } else { + return srs_error_wrap(err, "unknown rtcp type=%u", payload_type); } return err; @@ -857,25 +888,32 @@ SrsRtcServer::SrsRtcServer(SrsServer* svr) SrsRtcServer::~SrsRtcServer() { + rttrd->stop(); + srs_freep(rttrd); } srs_error_t SrsRtcServer::initialize() { srs_error_t err = srs_success; + rttrd = new SrsRtcTimerThread(this, _srs_context->get_id()); + if (rttrd->start() != srs_success) { + return srs_error_wrap(err, "rtc timer thread init failed"); + } + return err; } -srs_error_t SrsRtcServer::on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - if (is_stun(udp_remux_socket->data(), udp_remux_socket->size())) { - return on_stun(udp_remux_socket); - } else if (is_dtls(udp_remux_socket->data(), udp_remux_socket->size())) { - return on_dtls(udp_remux_socket); - } else if (is_rtp_or_rtcp(udp_remux_socket->data(), udp_remux_socket->size())) { - return on_rtp_or_rtcp(udp_remux_socket); + if (is_stun(udp_mux_skt->data(), udp_mux_skt->size())) { + return on_stun(udp_mux_skt); + } else if (is_dtls(udp_mux_skt->data(), udp_mux_skt->size())) { + return on_dtls(udp_mux_skt); + } else if (is_rtp_or_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + return on_rtp_or_rtcp(udp_mux_skt); } return srs_error_wrap(err, "unknown udp packet type"); @@ -883,20 +921,20 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp) { - SrsRtcSession* session = new SrsRtcSession(server, this); - std::string local_pwd = gen_random_str(32); std::string local_ufrag = ""; + std::string username = ""; while (true) { local_ufrag = gen_random_str(8); - std::string username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); - bool ret = map_username_session.insert(make_pair(username, session)).second; - if (ret) { + username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); + if (! map_username_session.count(username)) break; - } } + SrsRtcSession* session = new SrsRtcSession(server, this, username); + map_username_session.insert(make_pair(username, session)); + local_sdp.set_ice_ufrag(local_ufrag); local_sdp.set_ice_pwd(local_pwd); @@ -918,14 +956,14 @@ SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) return iter->second; } -srs_error_t SrsRtcServer::on_stun(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - srs_trace("recv stun packet from %s", udp_remux_socket->get_peer_id().c_str()); + srs_trace("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str()); SrsStunPacket stun_req; - if (stun_req.decode(udp_remux_socket->data(), udp_remux_socket->size()) != srs_success) { + if (stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size()) != srs_success) { return srs_error_wrap(err, "decode stun packet failed"); } @@ -935,37 +973,40 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpRemuxSocket* udp_remux_socket) return srs_error_wrap(err, "can not find rtc_session, stun username=%s", username.c_str()); } - return rtc_session->on_stun(udp_remux_socket, &stun_req); + return rtc_session->on_stun(udp_mux_skt, &stun_req); } -srs_error_t SrsRtcServer::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; srs_trace("on dtls"); - SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_remux_socket->get_peer_id()); + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); if (rtc_session == NULL) { - return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_remux_socket->get_peer_id().c_str()); + return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); } - rtc_session->on_dtls(udp_remux_socket); + rtc_session->on_dtls(udp_mux_skt); return err; } -srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket) +srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - srs_trace("on rtp/rtcp"); - SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_remux_socket->get_peer_id()); + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); if (rtc_session == NULL) { - return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_remux_socket->get_peer_id().c_str()); + return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); } - rtc_session->on_rtp_or_rtcp(udp_remux_socket); + if (is_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + rtc_session->on_rtcp(udp_mux_skt); + } else { + rtc_session->on_rtp(udp_mux_skt); + } return err; } @@ -984,3 +1025,82 @@ bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* { return map_id_session.insert(make_pair(peer_id, rtc_session)).second; } + +void SrsRtcServer::check_and_clean_timeout_session() +{ + map::iterator iter = map_username_session.begin(); + while (iter != map_username_session.end()) { + SrsRtcSession* session = iter->second; + if (session == NULL) { + map_username_session.erase(iter++); + continue; + } + + if (session->is_stun_timeout()) { + srs_trace("rtc session=%s, stun timeout", session->id().c_str()); + map_username_session.erase(iter++); + map_id_session.erase(session->get_peer_id()); + delete session; + continue; + } + + ++iter; + } +} + +SrsRtcTimerThread::SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid) +{ + _parent_cid = parent_cid; + trd = new SrsDummyCoroutine(); + + rtc_server = rtc_svr; +} + +SrsRtcTimerThread::~SrsRtcTimerThread() +{ + srs_freep(trd); +} + +int SrsRtcTimerThread::cid() +{ + return trd->cid(); +} + +srs_error_t SrsRtcTimerThread::start() +{ + srs_error_t err = srs_success; + + srs_freep(trd); + trd = new SrsSTCoroutine("rtc_timer", this, _parent_cid); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtc_timer"); + } + + return err; +} + +void SrsRtcTimerThread::stop() +{ + trd->stop(); +} + +void SrsRtcTimerThread::stop_loop() +{ + trd->interrupt(); +} + +srs_error_t SrsRtcTimerThread::cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd->pull()) != srs_success) { + srs_trace("rtc_timer cycle failed"); + return srs_error_wrap(err, "rtc timer thread"); + } + + srs_usleep(1*1000*1000LL); + rtc_server->check_and_clean_timeout_session(); + } +} diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index fab2afc4c..fc67ba431 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -35,12 +36,21 @@ #include #include -class SrsUdpRemuxSocket; +class SrsUdpMuxSocket; class SrsServer; class SrsConsumer; class SrsStunPacket; class SrsRtcServer; class SrsRtcSession; +class SrsSharedPtrMessage; + +const uint8_t kSR = 200; +const uint8_t kRR = 201; +const uint8_t kSDES = 202; +const uint8_t kBye = 203; +const uint8_t kApp = 204; + +const srs_utime_t kSrsRtcSessionStunTimeoutUs = 10*1000*1000LL; class SrsCandidate { @@ -116,19 +126,22 @@ public: SrsDtlsSession(SrsRtcSession* s); virtual ~SrsDtlsSession(); - srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt); srs_error_t on_dtls_application_data(const char* data, const int len); - void send_client_hello(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t handshake(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t srtp_sender_protect(char* protected_buf, const char* ori_buf, int& nb_protected_buf); - srs_error_t srtp_receiver_unprotect(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); - + void send_client_hello(SrsUdpMuxSocket* udp_mux_skt); +public: + srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); + srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); + srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); + srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); +private: + srs_error_t handshake(SrsUdpMuxSocket* udp_mux_skt); private: srs_error_t srtp_initialize(); - srs_error_t srtp_sender_side_init(); - srs_error_t srtp_receiver_side_init(); + srs_error_t srtp_send_init(); + srs_error_t srtp_recv_init(); }; class SrsRtcSenderThread : public ISrsCoroutineHandler @@ -138,11 +151,11 @@ protected: int _parent_cid; private: SrsRtcSession* rtc_session; - SrsUdpRemuxSocket ukt; + SrsUdpMuxSocket ukt; public: // Constructor. // @param tm The receive timeout in srs_utime_t. - SrsRtcSenderThread(SrsRtcSession* s, SrsUdpRemuxSocket* u, int parent_cid); + SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); virtual ~SrsRtcSenderThread(); public: virtual int cid(); @@ -152,6 +165,8 @@ public: virtual void stop_loop(); public: virtual srs_error_t cycle(); +private: + void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt); }; class SrsRtcSession @@ -165,39 +180,76 @@ private: SrsRtcSessionStateType session_state; SrsDtlsSession* dtls_session; SrsRtcSenderThread* strd; + std::string username; + std::string peer_id; + srs_utime_t last_stun_time; public: std::string app; std::string stream; public: - SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr); + SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const std::string& un); virtual ~SrsRtcSession(); public: SrsSdp* get_local_sdp() { return &local_sdp; } - SrsSdp* get_remote_sdp() { return &remote_sdp; } - SrsRtcSessionStateType get_session_state() { return session_state; } - void set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; } + + SrsSdp* get_remote_sdp() { return &remote_sdp; } void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; } + + SrsRtcSessionStateType get_session_state() { return session_state; } void set_session_state(SrsRtcSessionStateType state) { session_state = state; } + + std::string id() const { return peer_id + "_" + username; } + void set_app_stream(const std::string& a, const std::string& s) { app = a; stream = s; } + + std::string get_peer_id() const { return peer_id; } + void set_peer_id(const std::string& id) { peer_id = id; } public: - srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req); - srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); + srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtp(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt); public: - srs_error_t send_client_hello(SrsUdpRemuxSocket* udp_remux_socket); - void on_connection_established(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t start_play(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt); + void on_connection_established(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt); +public: + bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); } private: - srs_error_t on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req); + srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); private: - srs_error_t do_playing(SrsConsumer* consumer, SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t do_playing(SrsConsumer* consumer, SrsUdpMuxSocket* udp_mux_skt); }; -class SrsRtcServer : public ISrsUdpRemuxHandler +// XXX: is there any other timer thread? +class SrsRtcTimerThread : public ISrsCoroutineHandler +{ +protected: + SrsCoroutine* trd; + int _parent_cid; +private: + SrsRtcServer* rtc_server; +public: + // Constructor. + // @param tm The receive timeout in srs_utime_t. + SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid); + virtual ~SrsRtcTimerThread(); +public: + virtual int cid(); +public: + virtual srs_error_t start(); + virtual void stop(); + virtual void stop_loop(); +public: + virtual srs_error_t cycle(); +}; + +class SrsRtcServer : public ISrsUdpMuxHandler { private: SrsServer* server; + SrsRtcTimerThread* rttrd; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map map_id_session; // key: peerip(ip + ":" + port) @@ -207,14 +259,15 @@ public: public: virtual srs_error_t initialize(); - virtual srs_error_t on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket); + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt); SrsRtcSession* create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); + void check_and_clean_timeout_session(); private: - srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt); private: SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag); SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id); diff --git a/trunk/src/app/srs_app_rtp.cpp b/trunk/src/app/srs_app_rtp.cpp index dafdbd0b8..f3981babe 100644 --- a/trunk/src/app/srs_app_rtp.cpp +++ b/trunk/src/app/srs_app_rtp.cpp @@ -87,10 +87,12 @@ srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsF packet_fu_a(shared_frame, format, &sample, rtp_packet_vec); } +#if 0 srs_trace("nal size=%d, nal=%s", sample.size, dump_string_hex(sample.bytes, sample.size, sample.size).c_str()); for (int i = 0; i < shared_frame->nb_rtp_fragments; ++i) { srs_trace("rtp=%s", dump_string_hex(shared_frame->rtp_fragments[i].bytes, shared_frame->rtp_fragments[i].size, kRtpPacketSize).c_str()); } +#endif } SrsSample* rtp_samples = new SrsSample[rtp_packet_vec.size()]; @@ -134,7 +136,6 @@ srs_error_t SrsRtpMuxer::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsForma stream->write_1bytes(kH264PayloadType); } // sequence - srs_trace("sequence=%u", sequence); stream->write_2bytes(sequence++); // timestamp stream->write_4bytes(int32_t(shared_frame->timestamp * 90)); @@ -186,7 +187,6 @@ srs_error_t SrsRtpMuxer::packet_single_nalu(SrsSharedPtrMessage* shared_frame, S // marker payloadtype stream->write_1bytes(kMarker | kH264PayloadType); // sequenct - srs_trace("sequence=%u", sequence); stream->write_2bytes(sequence++); // timestamp stream->write_4bytes(int32_t(shared_frame->timestamp * 90)); @@ -220,7 +220,6 @@ srs_error_t SrsRtpMuxer::packet_stap_a(const string &sps, const string& pps, Srs // marker payloadtype stream->write_1bytes(kMarker | kH264PayloadType); // sequenct - srs_trace("sequence=%u", sequence); stream->write_2bytes(sequence++); // timestamp stream->write_4bytes(int32_t(shared_frame->timestamp * 90)); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index eee3a9784..330fd8d10 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -360,7 +360,7 @@ srs_error_t SrsRtcListener::listen(std::string i, int p) port = p; srs_freep(listener); - listener = new SrsUdpRemuxListener(rtc, ip, port); + listener = new SrsUdpMuxListener(rtc, ip, port); if ((err = listener->listen()) != srs_success) { return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); @@ -649,6 +649,10 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) if ((err = http_server->initialize()) != srs_success) { return srs_error_wrap(err, "http server initialize"); } + + if ((err = rtc_server->initialize()) != srs_success) { + return srs_error_wrap(err, "rtc server initialize"); + } return err; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 8fa17ff9d..cf677994d 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -162,8 +162,8 @@ public: class SrsRtcListener : public SrsListener { protected: - SrsUdpRemuxListener* listener; - ISrsUdpRemuxHandler* rtc; + SrsUdpMuxListener* listener; + ISrsUdpMuxHandler* rtc; public: SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t); virtual ~SrsRtcListener(); diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 555ff5a33..947150f61 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -402,6 +402,11 @@ int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to, return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout); } +int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout) +{ + return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout); +} + srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) { return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 4894e7049..947950e85 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -89,6 +89,7 @@ extern srs_netfd_t srs_netfd_open(int osfd); extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout); extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout); +extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout); extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);