From fb6c2fdf806f4045ad691acd02d40e3840d04bed Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 23 Sep 2020 19:29:19 +0800 Subject: [PATCH] RTC: Use event notify for stream source --- trunk/src/app/srs_app_rtc_conn.cpp | 12 ---------- trunk/src/app/srs_app_rtc_conn.hpp | 4 ---- trunk/src/app/srs_app_rtc_source.cpp | 34 +++++++++++++++++++++++++++- trunk/src/app/srs_app_rtc_source.hpp | 18 ++++++++++++++- 4 files changed, 50 insertions(+), 18 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 634530e53..bc379927b 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1431,11 +1431,6 @@ srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId return err; } -void SrsRtcPublishStream::on_consumers_finished() -{ - session_->on_consumers_finished(req->get_stream_url()); -} - srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; @@ -2067,13 +2062,6 @@ srs_error_t SrsRtcConnection::on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp) return srs_success; } -void SrsRtcConnection::on_consumers_finished(std::string url) -{ - if (hijacker_) { - hijacker_->on_consumers_finished(url); - } -} - void SrsRtcConnection::set_hijacker(ISrsRtcConnectionHijacker* h) { hijacker_ = h; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 761a1a42b..65509b7d0 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -370,7 +370,6 @@ private: public: void request_keyframe(uint32_t ssrc); virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); - void on_consumers_finished(); // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); @@ -412,8 +411,6 @@ public: virtual ~ISrsRtcConnectionHijacker(); public: virtual srs_error_t on_dtls_done() = 0; - // Notify when all consumers of publisher(specified by url) is finished. - virtual void on_consumers_finished(std::string url) = 0; }; // A RTC Peer Connection, SDP level object. @@ -516,7 +513,6 @@ public: srs_error_t on_rtcp_feedback_twcc(char* buf, int nb_buf); srs_error_t on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp); public: - void on_consumers_finished(std::string url); void set_hijacker(ISrsRtcConnectionHijacker* h); public: srs_error_t on_connection_established(); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 9c8f87728..6beb54efc 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -290,6 +290,14 @@ ISrsRtcPublishStream::~ISrsRtcPublishStream() { } +ISrsRtcStreamEventHandler::ISrsRtcStreamEventHandler() +{ +} + +ISrsRtcStreamEventHandler::~ISrsRtcStreamEventHandler() +{ +} + SrsRtcStream::SrsRtcStream() { is_created_ = false; @@ -411,7 +419,10 @@ void SrsRtcStream::on_consumer_destroy(SrsRtcConsumer* consumer) // When all consumers finished, notify publisher to handle it. if (publish_stream_ && consumers.empty()) { - publish_stream_->on_consumers_finished(); + for (size_t i = 0; i < event_handlers_.size(); i++) { + ISrsRtcStreamEventHandler* h = event_handlers_.at(i); + h->on_consumers_finished(); + } } } @@ -463,9 +474,30 @@ void SrsRtcStream::on_unpublish() _source_id = SrsContextId(); + for (size_t i = 0; i < event_handlers_.size(); i++) { + ISrsRtcStreamEventHandler* h = event_handlers_.at(i); + h->on_unpublish(); + } + // TODO: FIXME: Handle by statistic. } +void SrsRtcStream::subscribe(ISrsRtcStreamEventHandler* h) +{ + if (std::find(event_handlers_.begin(), event_handlers_.end(), h) == event_handlers_.end()) { + event_handlers_.push_back(h); + } +} + +void SrsRtcStream::unsubscribe(ISrsRtcStreamEventHandler* h) +{ + std::vector::iterator it; + it = std::find(event_handlers_.begin(), event_handlers_.end(), h); + if (it != event_handlers_.end()) { + event_handlers_.erase(it); + } +} + ISrsRtcPublishStream* SrsRtcStream::publish_stream() { return publish_stream_; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index ddcfa0d40..7a2f126c5 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -132,7 +132,17 @@ public: public: // Request keyframe(PLI) from publisher, for fresh consumer. virtual void request_keyframe(uint32_t ssrc) = 0; - // Notify publisher that all consumers is finished. +}; + +class ISrsRtcStreamEventHandler +{ +public: + ISrsRtcStreamEventHandler(); + virtual ~ISrsRtcStreamEventHandler(); +public: + // stream unpublish, sync API. + virtual void on_unpublish() = 0; + // no player subscribe this stream, sync API virtual void on_consumers_finished() = 0; }; @@ -160,6 +170,8 @@ private: bool is_created_; // Whether stream is delivering data, that is, DTLS is done. bool is_delivering_packets_; + // Notify stream event to event handler + std::vector event_handlers_; public: SrsRtcStream(); virtual ~SrsRtcStream(); @@ -193,6 +205,10 @@ public: virtual srs_error_t on_publish(); // When stop publish stream. virtual void on_unpublish(); +public: + // For event handler + void subscribe(ISrsRtcStreamEventHandler* h); + void unsubscribe(ISrsRtcStreamEventHandler* h); public: // Get and set the publisher, passed to consumer to process requests such as PLI. ISrsRtcPublishStream* publish_stream();