diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 0314170f7..627855bcb 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1406,14 +1406,11 @@ ISrsRtcConnectionHijacker::~ISrsRtcConnectionHijacker() SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id) { req = NULL; - is_publisher_ = false; cid = context_id; stat_ = new SrsRtcConnectionStatistic(); timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS); hijacker_ = NULL; - publisher_ = NULL; - player_ = NULL; sendonly_skt = NULL; server_ = s; transport_ = new SrsSecurityTransport(this); @@ -1431,11 +1428,10 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id) SrsRtcConnection::~SrsRtcConnection() { srs_freep(timer_); - srs_freep(player_); - srs_freep(publisher_); srs_freep(transport_); srs_freep(req); srs_freep(stat_); + srs_freep(pp_address_change); // Note that we should never delete the sendonly_skt, // it's just point to the object in peer_addresses_. @@ -1445,7 +1441,21 @@ SrsRtcConnection::~SrsRtcConnection() srs_freep(addr); } - srs_freep(pp_address_change); + // Cleanup publishers. + for(map::iterator it = publishers_.begin(); it != publishers_.end(); ++it) { + SrsRtcPublishStream* publisher = it->second; + srs_freep(publisher); + } + publishers_.clear(); + publishers_ssrc_map_.clear(); + + // Cleanup players. + for(map::iterator it = players_.begin(); it != players_.end(); ++it) { + SrsRtcPlayStream* player = it->second; + srs_freep(player); + } + players_.clear(); + players_ssrc_map_.clear(); } SrsSdp* SrsRtcConnection::get_local_sdp() @@ -1615,12 +1625,11 @@ srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp) return err; } -srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool is_publisher, bool dtls, bool srtp, string username) +srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, string username) { srs_error_t err = srs_success; username_ = username; - is_publisher_ = is_publisher; req = r->copy(); if (!srtp) { @@ -1725,12 +1734,88 @@ srs_error_t SrsRtcConnection::dispatch_rtcp(SrsRtcpCommon* rtcp) { srs_error_t err = srs_success; - if (player_) { - return player_->on_rtcp(rtcp); - } + // TODO: FIXME: Refine the logic code, simple it. + map::iterator it_pub; + map::iterator it_player; + SrsRtcPlayStream* player = NULL; + SrsRtcPublishStream* publisher = NULL; - if (publisher_) { - return publisher_->on_rtcp(rtcp); + if(SrsRtcpType_sr == rtcp->type()) { + it_pub = publishers_ssrc_map_.find(rtcp->get_ssrc()); + if(publishers_ssrc_map_.end() == it_pub) { + srs_warn("SR: ssrc %d is unknown", rtcp->get_ssrc()); + return err; + } + publisher = it_pub->second; + if(srs_success != (err = publisher->on_rtcp(rtcp))) { + return srs_error_wrap(err, "handle sr"); + } + } else if(SrsRtcpType_rr == rtcp->type()) { + SrsRtcpRR* rr = dynamic_cast(rtcp); + srs_assert(NULL != rr); + if (rr->get_rb_ssrc() == 0) { //for native client + return err; + } + it_player = players_ssrc_map_.find(rr->get_rb_ssrc()); + if(players_ssrc_map_.end() == it_player) { + srs_warn("RR: ssrc %d is unknown", rr->get_rb_ssrc()); + return err; + } + player = it_player->second; + if(srs_success != (err = player->on_rtcp(rtcp))) { + return srs_error_wrap(err, "handle rr"); + } + } else if(SrsRtcpType_rtpfb == rtcp->type()) { + if(1 == rtcp->get_rc()) { + //nack + SrsRtcpNack* nack = dynamic_cast(rtcp); + srs_assert(NULL != nack); + it_player = players_ssrc_map_.find(nack->get_media_ssrc()); + if(players_ssrc_map_.end() == it_player) { + srs_warn("ftpfb: ssrc %d is unknown", nack->get_media_ssrc()); + return err; + } + player = it_player->second; + if(srs_success != (err = player->on_rtcp(rtcp))) { + return srs_error_wrap(err, "handle nack"); + } + } else if(15 == rtcp->get_rc()) { + // twcc + if(srs_success != (err = on_rtcp_feedback(rtcp->data(), rtcp->size()))) { + return srs_error_wrap(err, "handle twcc feedback"); + } + } + } else if(SrsRtcpType_psfb == rtcp->type()) { + SrsRtcpPsfbCommon* psfb = dynamic_cast(rtcp); + srs_assert(psfb != NULL); + it_player = players_ssrc_map_.find(psfb->get_media_ssrc()); + if(players_ssrc_map_.end() == it_player) { + srs_warn("psfb: ssrc %d is unknown", psfb->get_media_ssrc()); + return err; + } + player = it_player->second; + if(srs_success != (err = player->on_rtcp(rtcp))) { + return srs_error_wrap(err, "handle nack"); + } + } else { + // try to find player to assign rtcp + it_player = players_ssrc_map_.find(rtcp->get_ssrc()); + if(players_ssrc_map_.end() != it_player) { + player = it_player->second; + if(srs_success != (err = player->on_rtcp(rtcp))) { + return srs_error_wrap(err, "handle common rtcp"); + } + return err; + } + + // try to find publisher to assign rtcp + it_pub = publishers_ssrc_map_.find(rtcp->get_ssrc()); + if(publishers_ssrc_map_.end() != it_pub) { + publisher = it_pub->second; + if(srs_success != (err = publisher->on_rtcp(rtcp))) { + return srs_error_wrap(err, "handle sr"); + } + } } return err; @@ -1748,27 +1833,58 @@ void SrsRtcConnection::set_hijacker(ISrsRtcConnectionHijacker* h) srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) { - if (publisher_ == NULL) { + srs_error_t err = srs_success; + + if (publishers_.size() == 0) { return srs_error_new(ERROR_RTC_RTCP, "no publisher"); } - //TODO: FIXME: add unprotect_rtcp. - return publisher_->on_rtp(data, nb_data); + SrsRtpHeader header; + if (true) { + SrsBuffer* buffer = new SrsBuffer(data, nb_data); + SrsAutoFree(SrsBuffer, buffer); + + if(srs_success != (err = header.decode(buffer))) { + return srs_error_wrap(err, "decode rtp header"); + } + } + + map::iterator it = publishers_ssrc_map_.find(header.get_ssrc()); + if(it == publishers_ssrc_map_.end()) { + return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", header.get_ssrc()); + } + + SrsRtcPublishStream* publisher = it->second; + return publisher->on_rtp(data, nb_data); } srs_error_t SrsRtcConnection::on_connection_established() { srs_error_t err = srs_success; - srs_trace("RTC: session %s, to=%dms connection established", (is_publisher_? "Publisher":"Subscriber"), + srs_trace("RTC: session pub=%u, sub=%u, to=%dms connection established", publishers_.size(), players_.size(), srsu2msi(session_timeout)); - if (is_publisher_) { - if ((err = start_publish()) != srs_success) { + // start all publisher + for(map::iterator it = publishers_.begin(); it != publishers_.end(); ++it) { + string url = it->first; + SrsRtcPublishStream* publisher = it->second; + + srs_trace("RTC: Publisher url=%s established", url.c_str()); + + if ((err = publisher->start()) != srs_success) { return srs_error_wrap(err, "start publish"); } - } else { - if ((err = start_play()) != srs_success) { + } + + // start all player + for(map::iterator it = players_.begin(); it != players_.end(); ++it) { + string url = it->first; + SrsRtcPlayStream* player = it->second; + + srs_trace("RTC: Subscriber url=%s established", url.c_str()); + + if ((err = player->start()) != srs_success) { return srs_error_wrap(err, "start play"); } } @@ -1782,22 +1898,33 @@ srs_error_t SrsRtcConnection::on_connection_established() return err; } -srs_error_t SrsRtcConnection::start_play() +srs_error_t SrsRtcConnection::start_play(string stream_uri) { srs_error_t err = srs_success; - if ((err = player_->start()) != srs_success) { + map::iterator it = players_.find(stream_uri); + if(it == players_.end()) { + return srs_error_new(ERROR_RTC_NO_PLAYER, "not subscribe %s", stream_uri.c_str()); + } + + SrsRtcPlayStream* player = it->second; + if ((err = player->start()) != srs_success) { return srs_error_wrap(err, "start"); } return err; } -srs_error_t SrsRtcConnection::start_publish() +srs_error_t SrsRtcConnection::start_publish(std::string stream_uri) { srs_error_t err = srs_success; - if ((err = publisher_->start()) != srs_success) { + map::iterator it = publishers_.find(stream_uri); + if(it == publishers_.end()) { + return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no %s publisher", stream_uri.c_str()); + } + + if ((err = it->second->start()) != srs_success) { return srs_error_wrap(err, "start"); } @@ -2044,8 +2171,9 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc) void SrsRtcConnection::simulate_nack_drop(int nn) { - if (publisher_) { - publisher_->simulate_nack_drop(nn); + for(map::iterator it = publishers_.begin(); it != publishers_.end(); ++it) { + SrsRtcPublishStream* publisher = it->second; + publisher->simulate_nack_drop(nn); } nn_simulate_player_nack_drop = nn; @@ -2118,15 +2246,28 @@ srs_error_t SrsRtcConnection::do_send_packets(const std::vector& return err; } -void SrsRtcConnection::set_all_tracks_status(bool status) +void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_publish, bool status) { - if (player_) { - player_->set_all_tracks_status(status); + // For publishers. + if (is_publish) { + map::iterator it = publishers_.find(stream_uri); + if (publishers_.end() == it) { + return; + } + + SrsRtcPublishStream* publisher = it->second; + publisher->set_all_tracks_status(status); + return; } - if (publisher_) { - publisher_->set_all_tracks_status(status); + // For players. + map::iterator it = players_.find(stream_uri); + if (players_.end() == it) { + return; } + + SrsRtcPlayStream* player = it->second; + player->set_all_tracks_status(status); } #ifdef SRS_OSX @@ -2813,14 +2954,43 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::mapget_stream_url())) { return err; } - player_ = new SrsRtcPlayStream(this, _srs_context->get_id()); - if ((err = player_->initialize(req, sub_relations)) != srs_success) { + SrsRtcPlayStream* player = new SrsRtcPlayStream(this, _srs_context->get_id()); + if ((err = player->initialize(req, sub_relations)) != srs_success) { + srs_freep(player); return srs_error_wrap(err, "SrsRtcPlayStream init"); } + players_.insert(make_pair(req->get_stream_url(), player)); + + // make map between ssrc and player for fastly searching + for(map::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) { + SrsRtcTrackDescription* track_desc = it->second; + if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate ssrc %d, track id: %s", + track_desc->ssrc_, track_desc->id_.c_str()); + } + players_ssrc_map_[track_desc->ssrc_] = player; + + if(0 != track_desc->fec_ssrc_) { + if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->fec_ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate fec ssrc %d, track id: %s", + track_desc->fec_ssrc_, track_desc->id_.c_str()); + } + players_ssrc_map_[track_desc->fec_ssrc_] = player; + } + + if(0 != track_desc->rtx_ssrc_) { + if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->rtx_ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate rtx ssrc %d, track id: %s", + track_desc->rtx_ssrc_, track_desc->id_.c_str()); + } + players_ssrc_map_[track_desc->rtx_ssrc_] = player; + } + } // TODO: FIXME: Support reload. // The TWCC ID is the ext-map ID in local SDP, and we set to enable GCC. @@ -2849,14 +3019,68 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDesc return srs_error_new(ERROR_RTC_STREAM_DESC, "rtc publisher init"); } - if (publisher_) { + // Ignore if exists. + if(publishers_.end() != publishers_.find(req->get_stream_url())) { return err; } - publisher_ = new SrsRtcPublishStream(this); - if ((err = publisher_->initialize(req, stream_desc)) != srs_success) { + SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this); + if ((err = publisher->initialize(req, stream_desc)) != srs_success) { + srs_freep(publisher); return srs_error_wrap(err, "rtc publisher init"); } + publishers_[req->get_stream_url()] = publisher; + + if(NULL != stream_desc->audio_track_desc_) { + if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s", + stream_desc->audio_track_desc_->ssrc_, stream_desc->audio_track_desc_->id_.c_str()); + } + publishers_ssrc_map_[stream_desc->audio_track_desc_->ssrc_] = publisher; + + if(0 != stream_desc->audio_track_desc_->fec_ssrc_ + && stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->fec_ssrc_) { + if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->fec_ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s", + stream_desc->audio_track_desc_->fec_ssrc_, stream_desc->audio_track_desc_->id_.c_str()); + } + publishers_ssrc_map_[stream_desc->audio_track_desc_->fec_ssrc_] = publisher; + } + + if(0 != stream_desc->audio_track_desc_->rtx_ssrc_ + && stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->rtx_ssrc_) { + if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->rtx_ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s", + stream_desc->audio_track_desc_->rtx_ssrc_, stream_desc->audio_track_desc_->id_.c_str()); + } + publishers_ssrc_map_[stream_desc->audio_track_desc_->rtx_ssrc_] = publisher; + } + } + + for(int i = 0; i < stream_desc->video_track_descs_.size(); ++i) { + SrsRtcTrackDescription* track_desc = stream_desc->video_track_descs_.at(i); + if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s", + track_desc->ssrc_, track_desc->id_.c_str()); + } + publishers_ssrc_map_[track_desc->ssrc_] = publisher; + + if(0 != track_desc->fec_ssrc_ && track_desc->ssrc_ != track_desc->fec_ssrc_) { + if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->fec_ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s", + track_desc->fec_ssrc_, track_desc->id_.c_str()); + } + publishers_ssrc_map_[track_desc->fec_ssrc_] = publisher; + } + + if(0 != track_desc->rtx_ssrc_ && track_desc->rtx_ssrc_ != track_desc->fec_ssrc_) { + if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->rtx_ssrc_)) { + return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s", + track_desc->rtx_ssrc_, track_desc->id_.c_str()); + } + publishers_ssrc_map_[track_desc->rtx_ssrc_] = publisher; + } + } return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index fdfdf50bc..f091d1854 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -378,10 +378,16 @@ private: SrsRtcServer* server_; SrsRtcConnectionStateType state_; ISrsRtcTransport* transport_; - SrsRtcPlayStream* player_; - SrsRtcPublishStream* publisher_; - bool is_publisher_; SrsHourGlass* timer_; +private: + // key: stream id + std::map players_; + //key: player track's ssrc + std::map players_ssrc_map_; + // key: stream id + std::map publishers_; + // key: publisher track's ssrc + std::map publishers_ssrc_map_; private: // The local:remote username, such as m5x0n128:jvOm where local name is m5x0n128. std::string username_; @@ -433,7 +439,7 @@ public: srs_error_t add_player2(SrsRequest* request, SrsSdp& local_sdp); public: // Before initialize, user must set the local SDP, which is used to inititlize DTLS. - srs_error_t initialize(SrsRequest* r, bool is_publisher, bool dtls, bool srtp, std::string username); + srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username); // The peer address may change, we can identify that by STUN messages. srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); srs_error_t on_dtls(char* data, int nb_data); @@ -446,8 +452,8 @@ public: void set_hijacker(ISrsRtcConnectionHijacker* h); public: srs_error_t on_connection_established(); - srs_error_t start_play(); - srs_error_t start_publish(); + srs_error_t start_play(std::string stream_uri); + srs_error_t start_publish(std::string stream_uri); bool is_stun_timeout(); void update_sendonly_socket(SrsUdpMuxSocket* skt); // interface ISrsHourGlass @@ -466,7 +472,7 @@ public: void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes); srs_error_t do_send_packets(const std::vector& pkts, SrsRtcPlayStreamStatistic& info); // Directly set the status of play track, generally for init to set the default value. - void set_all_tracks_status(bool status); + void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status); private: srs_error_t on_binding_request(SrsStunPacket* r); // publish media capabilitiy negotiate diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 621c5fee3..1dcc6c43d 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -453,7 +453,7 @@ srs_error_t SrsRtcServer::do_create_session( } // All tracks default as inactive, so we must enable them. - session->set_all_tracks_status(true); + session->set_all_tracks_status(req->get_stream_url(), publish, true); std::string local_pwd = srs_random_str(32); std::string local_ufrag = ""; @@ -504,7 +504,7 @@ srs_error_t SrsRtcServer::do_create_session( session->set_state(WAITING_STUN); // Before session initialize, we must setup the local SDP. - if ((err = session->initialize(req, publish, dtls, srtp, username)) != srs_success) { + if ((err = session->initialize(req, dtls, srtp, username)) != srs_success) { return srs_error_wrap(err, "init"); } @@ -567,7 +567,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* // TODO: FIXME: Collision detect. string username = session->get_local_sdp()->get_ice_ufrag() + ":" + remote_sdp.get_ice_ufrag(); - if ((err = session->initialize(req, false, true, true, username)) != srs_success) { + if ((err = session->initialize(req, true, true, username)) != srs_success) { return srs_error_wrap(err, "init"); } diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index e77923ae1..d4e1c9e3d 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -357,6 +357,9 @@ #define ERROR_RTC_STREAM_DESC 5026 #define ERROR_RTC_TRACK_CODEC 5027 #define ERROR_RTC_NO_PLAYER 5028 +#define ERROR_RTC_NO_PUBLISHER 5029 +#define ERROR_RTC_DUPLICATED_SSRC 5030 +#define ERROR_RTC_NO_TRACK 5031 /////////////////////////////////////////////////////// // GB28181 API error.