From 8fde0366fb13b0d04c0e8fa6bfc0600028ed91c0 Mon Sep 17 00:00:00 2001 From: MarkCao Date: Mon, 6 Mar 2023 09:09:27 +0800 Subject: [PATCH] Kickoff publisher when stream is idle, which means no players. v6.0.31, v5.0.144 (#3105) For some use scenario, the publisher is invited when player want to view the stream: 1. Publisher connect to system, but does not publish any stream to SRS yet. 2. Player connect to system and start to request the stream. 3. System notifies publisher to publish stream to SRS. 4. Player play the stream from SRS. Please notice that `system` means your business system, not SRS. This is what we called `on-demand-live-streaming`, so when the last player stop to view the stream, what happends? 1. System needs to notify publisher to stop publish. 2. Or, SRS disconnect the publisher when idle(the last player stops playing). This PR is for the solution 2, so that the cleanup is very simple, your system does not need to notify publisher to stop publish, because SRS has already disconnected the publihser. --------- Co-authored-by: winlin Co-authored-by: chundonglinlin --- trunk/conf/full.conf | 6 ++++ trunk/conf/rtmp.kickoff.conf | 13 +++++++ trunk/doc/CHANGELOG.md | 4 ++- trunk/doc/Features.md | 1 + trunk/src/app/srs_app_config.cpp | 39 ++++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 3 ++ trunk/src/app/srs_app_gb28181.cpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 6 ++++ trunk/src/app/srs_app_source.cpp | 49 ++++++++++++++++++++++----- trunk/src/app/srs_app_source.hpp | 13 ++++--- trunk/src/core/srs_core_version5.hpp | 2 +- trunk/src/core/srs_core_version6.hpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/utest/srs_utest_config.cpp | 3 ++ 14 files changed, 125 insertions(+), 19 deletions(-) create mode 100644 trunk/conf/rtmp.kickoff.conf diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 9504a0a3d..e3cd0f39c 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -1341,6 +1341,12 @@ vhost publish.srs.com { # Overwrite by env SRS_VHOST_PUBLISH_TRY_ANNEXB_FIRST for all vhosts. # default: on try_annexb_first on; + # The timeout in seconds to disconnect publisher when idle, which means no players. + # Note that 0 means no timeout or this feature is disabled. + # Note that this feature conflicts with forward, because it disconnect the publisher stream. + # Overwrite by env SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE for all vhosts. + # default: 0 + kickoff_for_idle 0; } } diff --git a/trunk/conf/rtmp.kickoff.conf b/trunk/conf/rtmp.kickoff.conf new file mode 100644 index 000000000..7ab0db693 --- /dev/null +++ b/trunk/conf/rtmp.kickoff.conf @@ -0,0 +1,13 @@ +# the config for srs to delivery RTMP with kicking off publish as no one watching. +# @see https://github.com/ossrs/srs/wiki/v1_CN_SampleRTMP +# @see full.conf for detail config. + +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; +vhost __defaultVhost__ { + publish { + kickoff_for_idle 60000; + } +} \ No newline at end of file diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 774e64824..018395bd4 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,6 +8,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2023-03-04, Merge [#3105](https://github.com/ossrs/srs/pull/3105): Kickoff publisher when stream is idle, which means no players. v6.0.31 (#3105) * v6.0, 2023-02-25, Merge [#3438](https://github.com/ossrs/srs/pull/3438): Forward add question mark to the end. v6.0.30 (#3438) * v6.0, 2023-02-25, Merge [#3416](https://github.com/ossrs/srs/pull/3416): GB: Support HEVC for regression test and load tool for GB. v6.0.29 (#3416) * v6.0, 2023-02-25, Merge [#3424](https://github.com/ossrs/srs/pull/3424): API: Add service_id for http_hooks, which identify the process. v6.0.28 (#3424) @@ -44,7 +45,8 @@ The changelog for SRS. ## SRS 5.0 Changelog -* v5.0, 2023-02-25, Merge [#3424](https://github.com/ossrs/srs/pull/3424): API: Add service_id for http_hooks, which identify the process. v5.0.142 (#3424) +* v5.0, 2023-03-04, Merge [#3105](https://github.com/ossrs/srs/pull/3105): Kickoff publisher when stream is idle, which means no players. v5.0.144 (#3105) +* v5.0, 2023-02-25, Merge [#3424](https://github.com/ossrs/srs/pull/3424): API: Add service_id for http_hooks, which identify the process. v5.0.143 (#3424) * v5.0, 2023-02-22, Compatible with legacy RTMP URL. v5.0.142 * v5.0, 2023-02-12, Merge [#3409](https://github.com/ossrs/srs/pull/3409): SRT: Reduce latency to 200ms of srt2rtc.conf. v5.0.141 (#3409) * v5.0, 2023-02-08, Merge [#3391](https://github.com/ossrs/srs/pull/3391): Config: Error when both HLS and HTTP-TS enabled. v5.0.140 (#3391) diff --git a/trunk/doc/Features.md b/trunk/doc/Features.md index 9c6b0310c..fb890fd0f 100644 --- a/trunk/doc/Features.md +++ b/trunk/doc/Features.md @@ -47,6 +47,7 @@ The features of SRS. - [x] Live: Support origin cluster, please read [#464](https://github.com/ossrs/srs/issues/464), [RTMP 302](https://github.com/ossrs/srs/issues/92). v3.0.0+ - [x] Live: Support NGINX HLS Cluster, see [CN](https://ossrs.net/lts/zh-cn/docs/v4/doc/sample-hls-cluster) or [EN](https://ossrs.io/lts/en-us/docs/v4/doc/sample-hls-cluster). v5.0.28+ - [x] Live: SRT: Support PUSH SRT by IP and optional port, see [#3198](https://github.com/ossrs/srs/issues/3198). v5.0.76+ +- [x] Live: Kickoff publisher when stream is idle, which means no players. v5.0.144+ - [x] Live: [Experimental] Support SRT server, read [#1147](https://github.com/ossrs/srs/issues/1147). v4.0.143+ - [x] Live: [Experimental] Support Coroutine Native SRT over ST, [#3010](https://github.com/ossrs/srs/pull/3010). v5.0.30+ - [x] Live: [Experimental] Support MPEG-DASH, Dynamic Adaptive Streaming over HTTP, read [#299](https://github.com/ossrs/srs/issues/299). v5.0.96+ diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 0ee7c2f5a..27c2000b0 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2559,7 +2559,7 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "mr" && m != "mr_latency" && m != "firstpkt_timeout" && m != "normal_timeout" - && m != "parse_sps" && m != "try_annexb_first") { + && m != "parse_sps" && m != "try_annexb_first" && m != "kickoff_for_idle") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.publish.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -2727,6 +2727,14 @@ srs_error_t SrsConfig::check_normal_config() } } + // Check forward dnd kickoff for publsher idle. + for (int n = 0; n < (int)vhosts.size(); n++) { + SrsConfDirective* vhost = vhosts[n]; + if (get_forward_enabled(vhost) && get_publish_kickoff_for_idle(vhost)) { + return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "vhost.forward conflicts with vhost.publish.kickoff_for_idle"); + } + } + // check ingest id unique. for (int i = 0; i < (int)vhosts.size(); i++) { SrsConfDirective* vhost = vhosts[i]; @@ -5340,6 +5348,35 @@ srs_utime_t SrsConfig::get_publish_normal_timeout(string vhost) return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); } +srs_utime_t SrsConfig::get_publish_kickoff_for_idle(std::string vhost) +{ + return get_publish_kickoff_for_idle(get_vhost(vhost)); +} + +srs_utime_t SrsConfig::get_publish_kickoff_for_idle(SrsConfDirective* vhost) +{ + SRS_OVERWRITE_BY_ENV_FLOAT_SECONDS("srs.vhost.publish.kickoff_for_idle"); // SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE + + static srs_utime_t DEFAULT = 0 * SRS_UTIME_SECONDS; + + SrsConfDirective* conf = vhost; + if (!conf) { + return DEFAULT; + } + + conf = conf->get("publish"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("kickoff_for_idle"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); +} + int SrsConfig::get_global_chunk_size() { SRS_OVERWRITE_BY_ENV_INT("srs.vhost.chunk_size"); // SRS_VHOST_CHUNK_SIZE diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 4cbd4b910..f0028b357 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -636,6 +636,9 @@ public: virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost); // The normal packet timeout in srs_utime_t for encoder. virtual srs_utime_t get_publish_normal_timeout(std::string vhost); + // The kickoff timeout in srs_utime_t for publisher. + virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost); + virtual srs_utime_t get_publish_kickoff_for_idle(SrsConfDirective* vhost); private: // Get the global chunk size. virtual int get_global_chunk_size(); diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index e65293259..71180f661 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -255,7 +255,7 @@ srs_error_t SrsLazyGbSession::cycle() // It maybe success with message. if (srs_error_code(err) == ERROR_SUCCESS) { - srs_trace("client finished%s.", srs_error_summary(err).c_str()); + srs_trace("client finished %s.", srs_error_summary(err).c_str()); srs_freep(err); return err; } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 32b2bd5b4..6411548a7 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -985,6 +985,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre // initialize the publish timeout. publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost); + srs_utime_t publish_kickoff_for_idle = _srs_config->get_publish_kickoff_for_idle(req->vhost); // set the sock options. set_sock_options(); @@ -1008,6 +1009,11 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre return srs_error_wrap(err, "rtmp: thread quit"); } + // Kick off the publisher when idle for a period of timeout. + if (source->publisher_is_idle_for(publish_kickoff_for_idle)) { + return srs_error_new(ERROR_KICKOFF_FOR_IDLE, "kicked for idle, url=%s, timeout=%ds", req->tcUrl.c_str(), srsu2si(publish_kickoff_for_idle)); + } + pprint->elapse(); // cond wait for timeout. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 705d328fb..302ef907e 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1877,7 +1877,7 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut // @see https://github.com/ossrs/srs/issues/714 #if 0 // When source expired, remove it. - if (source->expired()) { + if (source->stream_is_dead()) { int cid = source->source_id(); if (cid == -1 && source->pre_source_id() > 0) { cid = source->pre_source_id(); @@ -1926,7 +1926,8 @@ SrsLiveSource::SrsLiveSource() mix_queue = new SrsMixQueue(); _can_publish = true; - die_at = 0; + stream_die_at_ = 0; + publisher_idle_at_ = 0; handler = NULL; bridge_ = NULL; @@ -1983,10 +1984,10 @@ srs_error_t SrsLiveSource::cycle() return srs_success; } -bool SrsLiveSource::expired() +bool SrsLiveSource::stream_is_dead() { // unknown state? - if (die_at == 0) { + if (stream_die_at_ == 0) { return false; } @@ -2001,13 +2002,26 @@ bool SrsLiveSource::expired() } srs_utime_t now = srs_get_system_time(); - if (now > die_at + SRS_SOURCE_CLEANUP) { + if (now > stream_die_at_ + SRS_SOURCE_CLEANUP) { return true; } return false; } +bool SrsLiveSource::publisher_is_idle_for(srs_utime_t timeout) +{ + if (!publisher_idle_at_ || !timeout) { + return false; + } + + srs_utime_t now = srs_get_system_time(); + if (now > publisher_idle_at_ + timeout) { + return true; + } + return false; +} + srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) { srs_error_t err = srs_success; @@ -2634,6 +2648,11 @@ srs_error_t SrsLiveSource::on_publish() SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_publish(req, _source_id.c_str()); + + // When no players, the publisher is idle now. + if (consumers.empty()) { + publisher_idle_at_ = srs_get_system_time(); + } return err; } @@ -2680,7 +2699,7 @@ void SrsLiveSource::on_unpublish() // no consumer, stream is die. if (consumers.empty()) { - die_at = srs_get_system_time(); + stream_die_at_ = srs_get_system_time(); } } @@ -2690,7 +2709,11 @@ srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer) consumer = new SrsLiveConsumer(this); consumers.push_back(consumer); - + + // There should be one consumer, so reset the timeout. + stream_die_at_ = 0; + publisher_idle_at_ = 0; + // for edge, when play edge stream, check the state if (_srs_config->get_vhost_is_edge(req->vhost)) { // notice edge to start for the first client. @@ -2752,10 +2775,18 @@ void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer) if (it != consumers.end()) { it = consumers.erase(it); } - + if (consumers.empty()) { play_edge->on_all_client_stop(); - die_at = srs_get_system_time(); + + // For edge server, the stream die when the last player quit, because the edge stream is created by player + // activities, so it should die when all players quit. + if (_srs_config->get_vhost_is_edge(req->vhost)) { + stream_die_at_ = srs_get_system_time(); + } + + // When no players, the publisher is idle now. + publisher_idle_at_ = srs_get_system_time(); } } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index b8025a7bb..ae4d6da83 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -533,17 +533,20 @@ private: private: // Whether source is avaiable for publishing. bool _can_publish; - // The last die time, when all consumers quit and no publisher, - // We will remove the source when source die. - srs_utime_t die_at; + // The last die time, while die means neither publishers nor players. + srs_utime_t stream_die_at_; + // The last idle time, while idle means no players. + srs_utime_t publisher_idle_at_; public: SrsLiveSource(); virtual ~SrsLiveSource(); public: virtual void dispose(); virtual srs_error_t cycle(); - // Remove source when expired. - virtual bool expired(); + // Whether stream is dead, which is no publisher or player. + virtual bool stream_is_dead(); + // Whether publisher is idle for a period of timeout. + bool publisher_is_idle_for(srs_utime_t timeout); public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h); diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index 9814f77e1..46ff71091 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 143 +#define VERSION_REVISION 144 #endif diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index cb9ab9559..8439f0c02 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 30 +#define VERSION_REVISION 31 #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index f095d9533..d338dfbbe 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -167,6 +167,7 @@ XX(ERROR_RTMP_MESSAGE_CREATE , 2053, "MessageCreate", "Failed to create shared pointer message") \ XX(ERROR_RTMP_PROXY_EXCEED , 2054, "RtmpProxy", "Failed to decode message of RTMP proxy") \ XX(ERROR_RTMP_CREATE_STREAM_DEPTH , 2055, "RtmpIdentify", "Failed to identify RTMP client") \ + XX(ERROR_KICKOFF_FOR_IDLE , 2056, "KickoffForIdle", "Kickoff for publisher is idle") \ XX(ERROR_CONTROL_REDIRECT , 2997, "RtmpRedirect", "RTMP 302 redirection") \ XX(ERROR_CONTROL_RTMP_CLOSE , 2998, "RtmpClose", "RTMP connection is closed") \ XX(ERROR_CONTROL_REPUBLISH , 2999, "RtmpRepublish", "RTMP stream is republished") diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 9ea532676..925c010d9 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -4461,6 +4461,9 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesVhostPublish) SrsSetEnvConfig(try_annexb_first, "SRS_VHOST_PUBLISH_TRY_ANNEXB_FIRST", "off"); EXPECT_FALSE(conf.try_annexb_first("__defaultVhost__")); + + SrsSetEnvConfig(kickoff_for_idle, "SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE", "30"); + EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_publish_kickoff_for_idle("__defaultVhost__")); } }