diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 76b877742..d5a32658c 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -801,7 +801,6 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* string req_json; r->body_read_all(req_json); - srs_trace("req_json=%s", req_json.c_str()); SrsJsonAny* json = SrsJsonAny::loads(req_json); SrsJsonObject* req_obj = json->to_object(); @@ -818,18 +817,17 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* string app = app_obj->to_str(); string stream_name = stream_name_obj->to_str(); - srs_trace("remote_sdp_str=%s", remote_sdp_str.c_str()); - srs_trace("app=%s, stream=%s", app.c_str(), stream_name.c_str()); - SrsSdp remote_sdp; err = remote_sdp.decode(remote_sdp_str); if (err != srs_success) { return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); } + SrsRequest request; + request.app = app; + request.stream = stream_name; SrsSdp local_sdp; - SrsRtcSession* rtc_session = rtc_server->create_rtc_session(remote_sdp, local_sdp); - rtc_session->set_app_stream(app, stream_name); + SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp); string local_sdp_str = ""; err = local_sdp.encode(local_sdp_str); @@ -841,7 +839,7 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* SrsAutoFree(SrsJsonObject, obj); obj->set("code", SrsJsonAny::integer(ERROR_SUCCESS)); - obj->set("server", SrsJsonAny::integer(stat->server_id())); + obj->set("server", SrsJsonAny::integer(SrsStatistic::instance()->server_id())); // TODO: add candidates in response json? diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e49a4c556..62ed7bf03 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -51,24 +51,24 @@ using namespace std; #include #include -static bool is_stun(const char* data, const int size) +static bool is_stun(const uint8_t* data, const int size) { return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); } -static bool is_dtls(const char* data, size_t len) +static bool is_dtls(const uint8_t* data, size_t len) { return (len >= 13 && (data[0] > 19 && data[0] < 64)); } -static bool is_rtp_or_rtcp(const char* data, size_t len) +static bool is_rtp_or_rtcp(const uint8_t* data, size_t len) { return (len >= 12 && (data[0] & 0xC0) == 0x80); } -static bool is_rtcp(const char* data, size_t len) +static bool is_rtcp(const uint8_t* data, size_t len) { - return (len >=12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); + return (len >= 12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); } static string gen_random_str(int len) @@ -141,7 +141,7 @@ srs_error_t SrsSdp::decode(const string& sdp_str) string line; istringstream is(sdp_str); while (getline(is, line)) { - srs_trace("line=%s", line.c_str()); + srs_verbose("line=%s", line.c_str()); if (line.size() < 2 || line[1] != '=') { return srs_error_wrap(err, "invalid sdp line=%s", line.c_str()); @@ -267,7 +267,7 @@ srs_error_t SrsSdp::parse_attr(const string& line) } } - srs_trace("sdp attribute key=%s, val=%s", key.c_str(), val.c_str()); + srs_verbose("sdp attribute key=%s, val=%s", key.c_str(), val.c_str()); if (key == "ice-ufrag") { ice_ufrag = val; @@ -345,7 +345,6 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) } if (out_bio_len) { - srs_trace("send dtls handshake data"); udp_mux_skt->sendto(out_bio_data, out_bio_len, 0); } @@ -405,7 +404,7 @@ srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int void SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) { if (dtls == NULL) { - srs_trace("send client hello"); + srs_verbose("send client hello"); dtls = SSL_new(SrsDtls::instance()->get_dtls_ctx()); SSL_set_connect_state(dtls); @@ -631,21 +630,17 @@ srs_error_t SrsRtcSenderThread::cycle() srs_error_t err = srs_success; SrsSource* source = NULL; - SrsRequest req; - req.app = rtc_session->app; - req.stream = rtc_session->stream; - if (_srs_sources->fetch_or_create(&req, rtc_session->server, &source) != srs_success) { - srs_error("rtc fetch source failed"); + if (_srs_sources->fetch_or_create(&rtc_session->request, rtc_session->server, &source) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } - srs_trace("rtc fetch source success, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + srs_trace("source url=%s, source_id=%d[%d]", + rtc_session->request.get_stream_url().c_str(), source->source_id(), source->source_id()); SrsConsumer* consumer = NULL; if (source->create_consumer(NULL, consumer) != srs_success) { - srs_trace("rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); - return srs_error_wrap(err, "rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + return srs_error_wrap(err, "rtc create consumer, source url=%s", rtc_session->request.get_stream_url().c_str()); } SrsAutoFree(SrsConsumer, consumer); @@ -698,7 +693,7 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int } } -SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string& un) +SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un) { server = svr; rtc_server = rtc_svr; @@ -709,6 +704,9 @@ SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string username = un; last_stun_time = srs_get_system_time(); + + request = req; + source = NULL; } SrsRtcSession::~SrsRtcSession() @@ -736,6 +734,13 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* return err; } +void SrsRtcSession::check_source() +{ + if (source == NULL) { + _srs_sources->fetch_or_create(&request, server, &source); + } +} + srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) { srs_error_t err = srs_success; @@ -772,6 +777,210 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS return err; } +srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 12) { + return srs_error_wrap(err, "invalid rtp feedback packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + // @see: https://tools.ietf.org/html/rfc4585#section-6.1 + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P| FMT | PT | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of packet sender | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of media source | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : Feedback Control Information (FCI) : + : : + */ + uint8_t first = stream->read_1bytes(); + uint8_t version = first & 0xC0; + uint8_t padding = first & 0x20; + uint8_t fmt = first & 0x1F; + + uint8_t payload_type = stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + uint32_t ssrc_of_sender = stream->read_4bytes(); + uint32_t ssrc_of_media_source = stream->read_4bytes(); + + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | PID | BLP | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + + uint16_t pid = stream->read_2bytes(); + int blp = stream->read_2bytes(); + + srs_verbose("pid=%u, blp=%d", pid, blp); + + check_source(); + if (! source) { + return srs_error_wrap(err, "can not found source"); + } + + vector resend_pkts; + SrsRtpSharedPacket* pkt = source->find_rtp_packet(pid); + if (pkt) { + resend_pkts.push_back(pkt); + } + + uint16_t mask = 0x01; + for (int i = 0; i < 16 && blp; ++i, mask <<= 1) { + if (! (blp & mask)) { + continue; + } + + uint32_t loss_seq = pid + i; + + SrsRtpSharedPacket* pkt = source->find_rtp_packet(loss_seq); + if (! pkt) { + continue; + } + + resend_pkts.push_back(pkt); + } + + for (int i = 0; i < resend_pkts.size(); ++i) { + if (dtls_session) { + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = resend_pkts[i]->size; + + srs_verbose("resend pkt sequence=%u", resend_pkts[i]->sequence); + + dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf); + udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); + } + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 12) { + return srs_error_wrap(err, "invalid rtp feedback packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + uint8_t first = stream->read_1bytes(); + uint8_t version = first & 0xC0; + uint8_t padding = first & 0x20; + uint8_t fmt = first & 0x1F; + + uint8_t payload_type = stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + uint32_t ssrc_of_sender = stream->read_4bytes(); + uint32_t ssrc_of_media_source = stream->read_4bytes(); + + switch (fmt) { + case kPLI: { + srs_verbose("pli"); + break; + } + case kSLI: { + srs_verbose("sli"); + break; + } + case kRPSI: { + srs_verbose("rpsi"); + break; + } + case kAFB: { + srs_verbose("afb"); + break; + } + default: { + return srs_error_wrap(err, "unknown payload specific feedback=%u", fmt); + } + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 8) { + return srs_error_wrap(err, "invalid rtp receiver report packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + // @see: https://tools.ietf.org/html/rfc3550#section-6.4.2 + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +header |V=2|P| RC | PT=RR=201 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of packet sender | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +report | SSRC_1 (SSRC of first source) | +block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 1 | fraction lost | cumulative number of packets lost | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | extended highest sequence number received | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | interarrival jitter | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | last SR (LSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | delay since last SR (DLSR) | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +report | SSRC_2 (SSRC of second source) | +block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 2 : ... : + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | profile-specific extensions | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + uint8_t first = stream->read_1bytes(); + uint8_t version = first & 0xC0; + uint8_t padding = first & 0x20; + uint8_t rc = first & 0x1F; + + uint8_t payload_type = stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + uint32_t ssrc_of_sender = stream->read_4bytes(); + + if (((length + 1) * 4) != (rc * 24 + 8)) { + return srs_error_wrap(err, "invalid rtcp receiver packet, length=%u, rc=%u", length, rc); + } + + for (int i = 0; i < rc; ++i) { + uint32_t ssrc = stream->read_4bytes(); + uint8_t fraction_lost = stream->read_1bytes(); + uint32_t cumulative_number_of_packets_lost = stream->read_3bytes(); + uint32_t highest_seq = stream->read_4bytes(); + uint32_t jitter = stream->read_4bytes(); + uint32_t lst = stream->read_4bytes(); + uint32_t dlsr = stream->read_4bytes(); + + srs_verbose("ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u", + ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr); + } + + return err; +} + srs_error_t SrsRtcSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) { if (dtls_session == NULL) { @@ -855,6 +1064,7 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* udp_mux_skt) srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; + if (dtls_session == NULL) { return srs_error_wrap(err, "recv unexpect rtcp packet before dtls done"); } @@ -865,19 +1075,57 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) return srs_error_wrap(err, "rtcp unprotect failed"); } - // FIXME: use SrsRtpPacket - SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf); - SrsAutoFree(SrsBuffer, stream); - uint8_t first = stream->read_1bytes(); - uint8_t payload_type = stream->read_1bytes(); + char* ph = unprotected_buf; + int nb_left = nb_unprotected_buf; + while (nb_left) { + uint8_t payload_type = ph[1]; + uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - if (payload_type == kSR) { - } else if (payload_type == kRR) { - } else if (kSDES) { - } else if (kBye) { - } else if (kApp) { - } else { - return srs_error_wrap(err, "unknown rtcp type=%u", payload_type); + int length = (length_4bytes + 1) * 4; + + if (length > nb_unprotected_buf) { + return srs_error_wrap(err, "invalid rtcp packet, length=%u", length); + } + + srs_verbose("on rtcp, payload_type=%u", payload_type); + + switch (payload_type) { + case kSR: { + break; + } + case kRR: { + err = on_rtcp_receiver_report(ph, length, udp_mux_skt); + break; + } + case kSDES: { + break; + } + case kBye: { + break; + } + case kApp: { + break; + } + case kRtpFb: { + err = on_rtcp_feedback(ph, length, udp_mux_skt); + break; + } + case kPsFb: { + err = on_rtcp_ps_feedback(ph, length, udp_mux_skt); + break; + } + default:{ + return srs_error_wrap(err, "unknown rtcp type=%u", payload_type); + break; + } + } + + if (err != srs_success) { + return err; + } + + ph += length; + nb_left -= length; } return err; @@ -910,18 +1158,18 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - if (is_stun(udp_mux_skt->data(), udp_mux_skt->size())) { + if (is_stun(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { return on_stun(udp_mux_skt); - } else if (is_dtls(udp_mux_skt->data(), udp_mux_skt->size())) { + } else if (is_dtls(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { return on_dtls(udp_mux_skt); - } else if (is_rtp_or_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + } else if (is_rtp_or_rtcp(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { return on_rtp_or_rtcp(udp_mux_skt); } return srs_error_wrap(err, "unknown udp packet type"); } -SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp) +SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) { std::string local_pwd = gen_random_str(32); std::string local_ufrag = ""; @@ -934,7 +1182,7 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp break; } - SrsRtcSession* session = new SrsRtcSession(server, this, username); + SrsRtcSession* session = new SrsRtcSession(server, this, req, username); map_username_session.insert(make_pair(username, session)); local_sdp.set_ice_ufrag(local_ufrag); @@ -962,7 +1210,7 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - srs_trace("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str()); + srs_verbose("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str()); SrsStunPacket stun_req; if (stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size()) != srs_success) { @@ -981,7 +1229,6 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - srs_trace("on dtls"); SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); @@ -1004,7 +1251,7 @@ srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt) return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); } - if (is_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + if (is_rtcp(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { rtc_session->on_rtcp(udp_mux_skt); } else { rtc_session->on_rtp(udp_mux_skt); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index c1b786af4..d0db07e36 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -43,12 +44,23 @@ class SrsStunPacket; class SrsRtcServer; class SrsRtcSession; class SrsSharedPtrMessage; +class SrsSource; -const uint8_t kSR = 200; -const uint8_t kRR = 201; +const uint8_t kSR = 200; +const uint8_t kRR = 201; const uint8_t kSDES = 202; -const uint8_t kBye = 203; -const uint8_t kApp = 204; +const uint8_t kBye = 203; +const uint8_t kApp = 204; + +// @see: https://tools.ietf.org/html/rfc4585#section-6.1 +const uint8_t kRtpFb = 205; +const uint8_t kPsFb = 206; + +// @see: https://tools.ietf.org/html/rfc4585#section-6.3 +const uint8_t kPLI = 1; +const uint8_t kSLI = 2; +const uint8_t kRPSI = 3; +const uint8_t kAFB = 15; const srs_utime_t kSrsRtcSessionStunTimeoutUs = 10*1000*1000LL; @@ -182,10 +194,10 @@ private: std::string peer_id; srs_utime_t last_stun_time; public: - std::string app; - std::string stream; + SrsRequest request; + SrsSource* source; public: - SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const std::string& un); + SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un); virtual ~SrsRtcSession(); public: SrsSdp* get_local_sdp() { return &local_sdp; } @@ -199,8 +211,6 @@ public: std::string id() const { return peer_id + "_" + username; } - void set_app_stream(const std::string& a, const std::string& s) { app = a; stream = s; } - std::string get_peer_id() const { return peer_id; } void set_peer_id(const std::string& id) { peer_id = id; } public: @@ -214,10 +224,14 @@ public: srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt); public: bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); } +private: + void check_source(); private: srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); private: - srs_error_t do_playing(SrsConsumer* consumer, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); }; // XXX: is there any other timer thread? @@ -257,7 +271,7 @@ public: virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt); - SrsRtcSession* create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp); + SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); private: diff --git a/trunk/src/app/srs_app_rtp.cpp b/trunk/src/app/srs_app_rtp.cpp index 729251fc4..0101e30c5 100644 --- a/trunk/src/app/srs_app_rtp.cpp +++ b/trunk/src/app/srs_app_rtp.cpp @@ -78,7 +78,7 @@ srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsF uint8_t nal_type = header & kNalTypeMask; // ignore SEI nal - if (nal_type == 0x06) { + if (nal_type == 0x06 || nal_type == 0x09) { continue; } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 5f1de1149..16e4110bc 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -31,6 +31,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -326,6 +327,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p return err; } + srs_error_t SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; @@ -815,6 +817,55 @@ SrsSharedPtrMessage* SrsMixQueue::pop() return msg; } +SrsRtpPacketQueue::SrsRtpPacketQueue() +{ +} + +SrsRtpPacketQueue::~SrsRtpPacketQueue() +{ + clear(); +} + +void SrsRtpPacketQueue::clear() +{ + map::iterator iter = pkt_queue.begin(); + while (iter != pkt_queue.end()) { + srs_freep(iter->second); + pkt_queue.erase(iter++); + } +} + +void SrsRtpPacketQueue::push(std::vector& pkts) +{ + for (int i = 0; i < pkts.size(); ++i) { + insert(pkts[i]->sequence, pkts[i]); + } +} + +void SrsRtpPacketQueue::insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt) +{ + pkt_queue.insert(make_pair(sequence, pkt->copy())); + if (pkt_queue.size() >= 3000) { + srs_freep(pkt_queue.begin()->second); + pkt_queue.erase(pkt_queue.begin()); + } +} + +SrsRtpSharedPacket* SrsRtpPacketQueue::find(const uint16_t& sequence) +{ + if (pkt_queue.empty()) { + return NULL; + } + + SrsRtpSharedPacket* pkt = NULL; + map::iterator iter = pkt_queue.find(sequence); + if (iter != pkt_queue.end()) { + pkt = iter->second->copy(); + } + + return pkt; +} + SrsOriginHub::SrsOriginHub() { source = NULL; @@ -1076,6 +1127,8 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se srs_error_reset(err); rtp->on_unpublish(); } + + source->rtp_queue->push(msg->rtp_packets); if ((err = hls->on_video(msg, format)) != srs_success) { // apply the error strategy for hls. @@ -1696,7 +1749,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* SrsSource* source = NULL; if ((source = fetch(r)) != NULL) { - srs_trace("found source"); *pps = source; return err; } @@ -1816,6 +1868,7 @@ SrsSource::SrsSource() jitter_algorithm = SrsRtmpJitterAlgorithmOFF; mix_correct = false; mix_queue = new SrsMixQueue(); + rtp_queue = new SrsRtpPacketQueue(); _can_publish = true; _pre_source_id = _source_id = -1; @@ -1845,6 +1898,7 @@ SrsSource::~SrsSource() srs_freep(hub); srs_freep(meta); srs_freep(mix_queue); + srs_freep(rtp_queue); srs_freep(play_edge); srs_freep(publish_edge); @@ -2593,3 +2647,7 @@ string SrsSource::get_curr_origin() return play_edge->get_curr_origin(); } +SrsRtpSharedPacket* SrsSource::find_rtp_packet(const uint16_t& seq) +{ + return rtp_queue->find(seq); +} diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 674ffc99e..4f5d93669 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -62,6 +62,7 @@ class SrsBuffer; #ifdef SRS_AUTO_HDS class SrsHds; #endif +class SrsRtpSharedPacket; // The time jitter algorithm: // 1. full, to ensure stream start at zero, and ensure stream monotonically increasing. @@ -324,6 +325,28 @@ public: virtual SrsSharedPtrMessage* pop(); }; +class SrsRtpPacketQueue +{ +private: + struct SeqComp + { + bool operator()(const uint16_t& l, const uint16_t& r) const + { + return ((int16_t)(r - l)) > 0; + } + }; +private: + std::map pkt_queue; +public: + SrsRtpPacketQueue(); + virtual ~SrsRtpPacketQueue(); +public: + void clear(); + void push(std::vector& pkts); + void insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt); + SrsRtpSharedPacket* find(const uint16_t& sequence); +}; + // The hub for origin is a collection of utilities for origin only, // For example, DVR, HLS, Forward and Transcode are only available for origin, // they are meanless for edge server. @@ -499,6 +522,8 @@ private: bool mix_correct; // The mix queue to implements the mix correct algorithm. SrsMixQueue* mix_queue; + // rtp packet queue + SrsRtpPacketQueue* rtp_queue; // For play, whether enabled atc. // The atc(use absolute time and donot adjust time), // directly use msg time and donot adjust if atc is true, @@ -587,6 +612,9 @@ public: virtual void on_edge_proxy_unpublish(); public: virtual std::string get_curr_origin(); +public: + // Find rtp packet by sequence + SrsRtpSharedPacket* find_rtp_packet(const uint16_t& seq); }; #endif diff --git a/trunk/src/protocol/srs_stun_stack.cpp b/trunk/src/protocol/srs_stun_stack.cpp index 146bb23f7..ae0f43dff 100644 --- a/trunk/src/protocol/srs_stun_stack.cpp +++ b/trunk/src/protocol/srs_stun_stack.cpp @@ -83,9 +83,6 @@ srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) string magic_cookie = stream->read_string(4); transcation_id = stream->read_string(12); - srs_trace("message_type=%u, message_len=%u, magic_cookie=%s, transcation_id=%s", - message_type, message_len, magic_cookie.c_str(), transcation_id.c_str()); - if (nb_buf != 20 + message_len) { return srs_error_wrap(err, "invalid stun packet, message_len=%d, nb_buf=%d", message_len, nb_buf); } @@ -111,7 +108,7 @@ srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) if (p != string::npos) { local_ufrag = val.substr(0, p); remote_ufrag = val.substr(p + 1); - srs_trace("stun packet local_ufrag=%s, remote_ufrag=%s", local_ufrag.c_str(), remote_ufrag.c_str()); + srs_verbose("stun packet local_ufrag=%s, remote_ufrag=%s", local_ufrag.c_str(), remote_ufrag.c_str()); } break; }