From 3ae510b843c6b54c2e4a5d78c04eecfb036c9819 Mon Sep 17 00:00:00 2001 From: HuyaJohn Date: Mon, 9 Mar 2020 04:46:27 -0700 Subject: [PATCH] rtp dispatch done, but video can not play in chrome --- trunk/configure | 2 +- trunk/src/app/srs_app_http_api.cpp | 3 +- trunk/src/app/srs_app_rtc_conn.cpp | 214 +++++++++++++++++++++++++--- trunk/src/app/srs_app_rtc_conn.hpp | 52 ++++++- trunk/src/app/srs_app_rtp.cpp | 62 ++++++++ trunk/src/app/srs_app_server.cpp | 2 +- trunk/src/app/srs_app_source.cpp | 3 + trunk/src/kernel/srs_kernel_flv.cpp | 17 +++ trunk/src/kernel/srs_kernel_flv.hpp | 2 + 9 files changed, 325 insertions(+), 32 deletions(-) diff --git a/trunk/configure b/trunk/configure index c939a96c4..495400bb5 100755 --- a/trunk/configure +++ b/trunk/configure @@ -149,7 +149,7 @@ END LibSTRoot="${SRS_OBJS_DIR}/st"; LibSTfile="${LibSTRoot}/libst.a" if [[ $SRS_SHARED_ST == YES ]]; then LibSTfile="-lst"; fi # srtp -LibSrtpRoot="${SRS_OBJS_DIR}/srtp2"; LibSrtpFile="${LibSrtpRoot}/lib/libsrtp2.a" +LibSrtpRoot="${SRS_OBJS_DIR}/srtp2/include"; LibSrtpFile="${SRS_OBJS_DIR}/srtp2/lib/libsrtp2.a" if [[ $SRS_SHARED_SRTP == YES ]]; then LibSrtpFile="-lsrtp2"; fi # openssl-1.1.0e, for the RTMP complex handshake. LibSSLRoot="";LibSSLfile="" diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index bb892c57c..fd418bf86 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -835,7 +835,8 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* } SrsSdp local_sdp; - rtc_server->create_rtc_session(remote_sdp, local_sdp); + SrsRtcSession* rtc_session = rtc_server->create_rtc_session(remote_sdp, local_sdp); + rtc_session->set_app_stream(app, stream_name); string local_sdp_str = ""; err = local_sdp.encode(local_sdp_str); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 97f640f33..b543c3cae 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -38,8 +38,12 @@ using namespace std; #include #include #include +#include +#include #include #include +#include +#include #include static bool is_stun(const char* data, const int size) @@ -73,6 +77,29 @@ static string gen_random_str(int len) const int SRTP_MASTER_KEY_KEY_LEN = 16; const int SRTP_MASTER_KEY_SALT_LEN = 14; +static string dump_string_hex(const char* buf, const int nb_buf, const int& max_len); +static string dump_string_hex(const std::string& str, const int& max_len = 128) +{ + return dump_string_hex(str.c_str(), str.size(), max_len); +} + +static string dump_string_hex(const char* buf, const int nb_buf, const int& max_len = 128) +{ + char tmp_buf[1024*16]; + int len = 0; + + for (int i = 0; i < nb_buf && i < max_len; ++i) { + int nb = snprintf(tmp_buf + len, sizeof(tmp_buf) - len - 1, "%02X ", (uint8_t)buf[i]); + if (nb <= 0) + break; + + len += nb; + } + tmp_buf[len] = '\0'; + + return string(tmp_buf, len); +} + SrsCandidate::SrsCandidate() { } @@ -206,7 +233,7 @@ srs_error_t SrsSdp::encode(string& sdp_str) "a=ssrc:3233846890 msid:6VrfBKXrwK a0\\r\\n" "a=ssrc:3233846890 mslabel:6VrfBKXrwK\\r\\n" "a=ssrc:3233846890 label:6VrfBKXrwKa0\\r\\n" - "m=video 9 UDP/TLS/RTP/SAVPF 96 98 102\\r\\n" + "m=video 9 UDP/TLS/RTP/SAVPF 102\\r\\n" "c=IN IP4 0.0.0.0\\r\\n" + candidate_lines + "a=rtcp:9 IN IP4 0.0.0.0\\r\\n" @@ -221,18 +248,6 @@ srs_error_t SrsSdp::encode(string& sdp_str) "a=sendrecv\\r\\n" "a=mid:1\\r\\n" "a=rtcp-mux\\r\\n" - "a=rtpmap:96 VP8/90000\\r\\n" - "a=rtcp-fb:96 ccm fir\\r\\n" - "a=rtcp-fb:96 nack\\r\\n" - "a=rtcp-fb:96 nack pli\\r\\n" - "a=rtcp-fb:96 goog-remb\\r\\n" - "a=rtcp-fb:96 transport-cc\\r\\n" - "a=rtpmap:98 VP9/90000\\r\\n" - "a=rtcp-fb:98 ccm fir\\r\\n" - "a=rtcp-fb:98 nack\\r\\n" - "a=rtcp-fb:98 nack pli\\r\\n" - "a=rtcp-fb:98 goog-remb\\r\\n" - "a=rtcp-fb:98 transport-cc\\r\\n" "a=rtpmap:102 H264/90000\\r\\n" "a=rtcp-fb:102 goog-remb\\r\\n" "a=rtcp-fb:102 transport-cc\\r\\n" @@ -279,8 +294,10 @@ srs_error_t SrsSdp::parse_attr(const string& line) return err; } -SrsDtlsSession::SrsDtlsSession() +SrsDtlsSession::SrsDtlsSession(SrsRtcSession* s) { + rtc_session = s; + dtls = NULL; bio_in = NULL; bio_out = NULL; @@ -310,7 +327,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket) int ssl_err = SSL_get_error(dtls, ret); switch(ssl_err) { case SSL_ERROR_NONE: { - err = on_dtls_handshake_done(); + err = on_dtls_handshake_done(udp_remux_socket); } break; @@ -362,12 +379,20 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) return err; } -srs_error_t SrsDtlsSession::on_dtls_handshake_done() +srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_socket) { + srs_error_t err = srs_success; srs_trace("dtls handshake done"); handshake_done = true; - return srtp_init(); + if ((err = srtp_initialize()) != srs_success) { + srs_error("srtp init failed, err=%s", srs_error_desc(err).c_str()); + return srs_error_wrap(err, "srtp init failed"); + } + + rtc_session->on_connection_established(udp_remux_socket); + + return err; } srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int nb_buf) @@ -394,7 +419,7 @@ void SrsDtlsSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) } } -srs_error_t SrsDtlsSession::srtp_init() +srs_error_t SrsDtlsSession::srtp_initialize() { srs_error_t err = srs_success; @@ -417,6 +442,12 @@ srs_error_t SrsDtlsSession::srtp_init() client_key = sClientMasterKey + sClientMasterSalt; server_key = sServerMasterKey + sServerMasterSalt; + srs_trace("client_key size=%d, server_key=%d", client_key.size(), server_key.size()); + + if (srtp_init() != 0) { + return srs_error_wrap(err, "srtp init failed"); + } + if (srtp_sender_side_init() != srs_success) { return srs_error_wrap(err, "srtp sender size init failed"); } @@ -492,11 +523,132 @@ srs_error_t SrsDtlsSession::srtp_receiver_side_init() return err; } -SrsRtcSession::SrsRtcSession(SrsRtcServer* svr) +srs_error_t SrsDtlsSession::srtp_sender_protect(char* protected_buf, const char* ori_buf, int& nb_protected_buf) { - rtc_server = svr; + srs_error_t err = srs_success; + + if (srtp_send) { + memcpy(protected_buf, ori_buf, nb_protected_buf); + if (srtp_protect(srtp_send, protected_buf, &nb_protected_buf) != 0) { + srs_error("srtp sender protect failed"); + return srs_error_wrap(err, "srtp sender protect failed"); + } + + return err; + } + + return srs_error_wrap(err, "srtp sender protect failed"); +} + +SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpRemuxSocket* u, int parent_cid) + : ukt(NULL) +{ + _parent_cid = parent_cid; + trd = new SrsDummyCoroutine(); + + rtc_session = s; + ukt = *u; +} + +SrsRtcSenderThread::~SrsRtcSenderThread() +{ + srs_freep(trd); +} + +int SrsRtcSenderThread::cid() +{ + return trd->cid(); +} + +srs_error_t SrsRtcSenderThread::start() +{ + srs_error_t err = srs_success; + + srs_freep(trd); + trd = new SrsSTCoroutine("recv", this, _parent_cid); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "recv thread"); + } + + return err; +} + +void SrsRtcSenderThread::stop() +{ + trd->stop(); +} + +void SrsRtcSenderThread::stop_loop() +{ + trd->interrupt(); +} + + +srs_error_t SrsRtcSenderThread::cycle() +{ + srs_error_t err = srs_success; + + SrsSource* source = NULL; + SrsRequest req; + req.app = rtc_session->app; + req.stream = rtc_session->stream; + + if (_srs_sources->fetch_or_create(&req, rtc_session->server, &source) != srs_success) { + srs_error("rtc fetch source failed"); + return srs_error_wrap(err, "rtc fetch source failed"); + } + + srs_trace("rtc fetch source success, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + + SrsConsumer* consumer = NULL; + if (source->create_consumer(NULL, consumer) != srs_success) { + srs_trace("rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + return srs_error_wrap(err, "rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + } + + SrsAutoFree(SrsConsumer, consumer); + + while (true) { + SrsMessageArray msgs(SRS_PERF_MW_MSGS); + + int msg_count = 0; + if (consumer->dump_packets(&msgs, msg_count) != srs_success) { + srs_trace("rtc pop no rtp packets"); + continue; + } + + srs_trace("rtc pop %d rtp packets", msg_count); + + for (int i = 0; i < msg_count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + + for (int i = 0; i < msg->nb_rtp_fragments; ++i) { + srs_trace("rtp fragment size=%d, payload=%s", msg->rtp_fragments[i].size, + dump_string_hex(msg->rtp_fragments[i].bytes, msg->rtp_fragments[i].size, 128).c_str()); + + if (rtc_session->dtls_session) { + char rtp_send_protected_buf[1500]; + int rtp_send_protected_len = msg->rtp_fragments[i].size; + rtc_session->dtls_session->srtp_sender_protect(rtp_send_protected_buf, msg->rtp_fragments[i].bytes, rtp_send_protected_len); + ukt.sendto(rtp_send_protected_buf, rtp_send_protected_len, 0); + } + } + } + + srs_usleep(16000); + } +} + + +SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr) +{ + server = svr; + rtc_server = rtc_svr; session_state = INIT; dtls_session = NULL; + + strd = NULL; } SrsRtcSession::~SrsRtcSession() @@ -557,19 +709,35 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpRemuxSocket* udp_remux_socke srs_error_t SrsRtcSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) { if (dtls_session == NULL) { - dtls_session = new SrsDtlsSession(); + dtls_session = new SrsDtlsSession(this); } dtls_session->send_client_hello(udp_remux_socket); } +void SrsRtcSession::on_connection_established(SrsUdpRemuxSocket* udp_remux_socket) +{ + start_play(udp_remux_socket); +} + +srs_error_t SrsRtcSession::start_play(SrsUdpRemuxSocket* udp_remux_socket) +{ + srs_error_t err = srs_success; + + strd = new SrsRtcSenderThread(this, udp_remux_socket, _srs_context->get_id()); + strd->start(); + + return err; +} + srs_error_t SrsRtcSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) { return dtls_session->on_dtls(udp_remux_socket); } -SrsRtcServer::SrsRtcServer() +SrsRtcServer::SrsRtcServer(SrsServer* svr) { + server = svr; } SrsRtcServer::~SrsRtcServer() @@ -600,7 +768,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp) { - SrsRtcSession* session = new SrsRtcSession(this); + SrsRtcSession* session = new SrsRtcSession(server, this); std::string local_pwd = gen_random_str(32); std::string local_ufrag = ""; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index e715e8151..4f0606571 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -37,7 +37,10 @@ class SrsUdpRemuxSocket; class SrsServer; +class SrsConsumer; class SrsStunPacket; +class SrsRtcServer; +class SrsRtcSession; class SrsCandidate { @@ -95,6 +98,8 @@ enum SrsRtcSessionStateType class SrsDtlsSession { private: + SrsRtcSession* rtc_session; + SSL* dtls; BIO* bio_in; BIO* bio_out; @@ -108,34 +113,62 @@ private: bool handshake_done; public: - SrsDtlsSession(); + SrsDtlsSession(SrsRtcSession* s); virtual ~SrsDtlsSession(); srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); - srs_error_t on_dtls_handshake_done(); + srs_error_t on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_socket); srs_error_t on_dtls_application_data(const char* data, const int len); void send_client_hello(SrsUdpRemuxSocket* udp_remux_socket); srs_error_t handshake(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t srtp_sender_protect(char* protected_buf, const char* ori_buf, int& nb_protected_buf); private: - srs_error_t srtp_init(); + srs_error_t srtp_initialize(); srs_error_t srtp_sender_side_init(); srs_error_t srtp_receiver_side_init(); }; -class SrsRtcServer; +class SrsRtcSenderThread : public ISrsCoroutineHandler +{ +protected: + SrsCoroutine* trd; + int _parent_cid; +private: + SrsRtcSession* rtc_session; + SrsUdpRemuxSocket ukt; +public: + // Constructor. + // @param tm The receive timeout in srs_utime_t. + SrsRtcSenderThread(SrsRtcSession* s, SrsUdpRemuxSocket* u, int parent_cid); + virtual ~SrsRtcSenderThread(); +public: + virtual int cid(); +public: + virtual srs_error_t start(); + virtual void stop(); + virtual void stop_loop(); +public: + virtual srs_error_t cycle(); +}; class SrsRtcSession { + friend class SrsRtcSenderThread; private: + SrsServer* server; SrsRtcServer* rtc_server; SrsSdp remote_sdp; SrsSdp local_sdp; SrsRtcSessionStateType session_state; SrsDtlsSession* dtls_session; + SrsRtcSenderThread* strd; public: - SrsRtcSession(SrsRtcServer* svr); + std::string app; + std::string stream; +public: + SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr); virtual ~SrsRtcSession(); public: SrsSdp* get_local_sdp() { return &local_sdp; } @@ -145,22 +178,29 @@ public: void set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; } void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; } void set_session_state(SrsRtcSessionStateType state) { session_state = state; } + void set_app_stream(const std::string& a, const std::string& s) { app = a; stream = s; } public: srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req); srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); public: srs_error_t send_client_hello(SrsUdpRemuxSocket* udp_remux_socket); + void on_connection_established(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t start_play(SrsUdpRemuxSocket* udp_remux_socket); private: srs_error_t on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req); +private: + srs_error_t do_playing(SrsConsumer* consumer, SrsUdpRemuxSocket* udp_remux_socket); }; class SrsRtcServer : public ISrsUdpRemuxHandler { +private: + SrsServer* server; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map map_id_session; // key: peerip(ip + ":" + port) public: - SrsRtcServer(); + SrsRtcServer(SrsServer* svr); virtual ~SrsRtcServer(); public: virtual srs_error_t initialize(); diff --git a/trunk/src/app/srs_app_rtp.cpp b/trunk/src/app/srs_app_rtp.cpp index 52574fd22..e4d005cf5 100644 --- a/trunk/src/app/srs_app_rtp.cpp +++ b/trunk/src/app/srs_app_rtp.cpp @@ -86,9 +86,71 @@ srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsF { srs_error_t err = srs_success; + SrsSample* samples = new SrsSample[2000]; + int sample_index = 0; for (int i = 0; i < format->video->nb_samples; ++i) { SrsSample sample = format->video->samples[i]; + + srs_trace("nal size=%d, dump=%s", sample.size, dump_string_hex(sample.bytes, sample.size, 128).c_str()); + + static int max_packet_size = 900; + if (sample.size < max_packet_size) { + char* buf = new char[1460]; + SrsBuffer* stream = new SrsBuffer(buf, 1460); + SrsAutoFree(SrsBuffer, stream); + // write rtp header first + stream->write_1bytes(0x80); + stream->write_1bytes((1 << 7) | 102); + + stream->write_2bytes(sequence++); + stream->write_4bytes((int32_t)shared_frame->timestamp); + stream->write_4bytes((int32_t)3233846889); + stream->write_bytes(sample.bytes, sample.size); + + samples[sample_index].bytes = stream->data(); + samples[sample_index].size = stream->pos(); + ++sample_index; + } else { + int num_of_packet = (sample.size + max_packet_size) / max_packet_size; + char* p = sample.bytes + 1; + int left_bytes = sample.size - 1; + for (int n = 0; n < num_of_packet; ++n) { + char* buf = new char[1460]; + SrsBuffer* stream = new SrsBuffer(buf, 1460); + SrsAutoFree(SrsBuffer, stream); + // write rtp header first + stream->write_1bytes(0x80); + if (n == num_of_packet - 1) { + stream->write_1bytes((1 << 7) | 102); + } else { + stream->write_1bytes(102); + } + + stream->write_2bytes(sequence++); + stream->write_4bytes((int32_t)shared_frame->timestamp); + stream->write_4bytes((int32_t)3233846889); + + stream->write_1bytes(sample.bytes[0] & 0xE0 | 28); + if (n == 0) { + stream->write_1bytes(0x80 | sample.bytes[0] & 0x1F); + } else if (n == num_of_packet - 1) { + stream->write_1bytes(0x40 | sample.bytes[0] & 0x1F); + } else { + stream->write_1bytes(0x00 | sample.bytes[0] & 0x1F); + } + + int len = max_packet_size ? max_packet_size : left_bytes; + stream->write_bytes(p, len); + left_bytes -= len; + p += len; + + samples[sample_index].bytes = stream->data(); + samples[sample_index].size = stream->pos(); + ++sample_index; + } + } } + shared_frame->set_rtp_fragments(samples, sample_index); return err; } diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d630ca35d..eee3a9784 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -527,7 +527,7 @@ SrsServer::SrsServer() // new these objects in initialize instead. http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); - rtc_server = new SrsRtcServer(); + rtc_server = new SrsRtcServer(this); http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index f30eb0163..1452bf93d 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1696,6 +1696,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* SrsSource* source = NULL; if ((source = fetch(r)) != NULL) { + srs_trace("found source"); *pps = source; return err; } @@ -1705,6 +1706,8 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); + + srs_trace("new source, stream_url=%s", stream_url.c_str()); source = new SrsSource(); if ((err = source->initialize(r, h)) != srs_success) { diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 4dc8f68da..aa0d3a7ec 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -214,6 +214,14 @@ SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() srs_memory_unwatch(payload); #endif srs_freepa(payload); + + for (int i = 0; i < nb_rtp_fragments; ++i) { + srs_freep(rtp_fragments[i].bytes); + } + + if (nb_rtp_fragments) { + srs_freepa(rtp_fragments); + } } SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), size(0), payload(NULL), rtp_fragments(NULL), nb_rtp_fragments(0) @@ -307,6 +315,15 @@ bool SrsSharedPtrMessage::check(int stream_id) return false; } +void SrsSharedPtrMessage::set_rtp_fragments(SrsSample* samples, int nb_samples) +{ + ptr->rtp_fragments = samples; + ptr->nb_rtp_fragments = nb_samples; + + rtp_fragments = samples; + nb_rtp_fragments = nb_samples; +} + bool SrsSharedPtrMessage::is_av() { return ptr->header.message_type == RTMP_MSG_AudioMessage diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 30052b881..fe8fd9aa0 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -333,6 +333,8 @@ public: // check perfer cid and stream id. // @return whether stream id already set. virtual bool check(int stream_id); + + virtual void set_rtp_fragments(SrsSample* samples, int nb_samples); public: virtual bool is_av(); virtual bool is_audio();