From 7dbc95e085e4eb0263e21a35c7c0e4d63ced1129 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 10:21:30 +0800 Subject: [PATCH 1/2] refine the hls deviation for floor algorithm. --- README.md | 1 + trunk/src/app/srs_app_hls.cpp | 62 +++++++++++++++-------------------- trunk/src/app/srs_app_hls.hpp | 7 ++-- 3 files changed, 30 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 91d4f1f28..dc3aa2832 100755 --- a/README.md +++ b/README.md @@ -562,6 +562,7 @@ Supported operating systems and hardware: ### SRS 2.0 history +* v2.0, 2015-04-10, refine the hls deviation for floor algorithm. * v2.0, 2015-04-08, for [#375](https://github.com/winlinvip/simple-rtmp-server/issues/375), fix hls bug, keep cc continous between ts files. 2.0.159. * v2.0, 2015-04-04, for [#304](https://github.com/winlinvip/simple-rtmp-server/issues/304), rewrite annexb mux for ts, refer to apple sample. 2.0.157. * v2.0, 2015-04-03, enhanced avc decode, parse the sps get width+height. 2.0.156. diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 559966e79..24bd5d843 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -59,10 +59,10 @@ using namespace std; // drop the segment when duration of ts too small. #define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100 -// startup piece, the first piece, fragment percent to reap. -#define SRS_HLS_FLOOR_STARTUP_PERCENT 0.1 // fragment plus the deviation percent. #define SRS_HLS_FLOOR_REAP_PERCENT 0.2 +// reset the piece id when deviation overflow this. +#define SRS_JUMP_WHEN_PIECE_DEVIATION 10 ISrsHlsHandler::ISrsHlsHandler() { @@ -224,7 +224,7 @@ SrsHlsMuxer::SrsHlsMuxer() handler = NULL; hls_fragment = hls_window = 0; hls_aof_ratio = 1.0; - hls_fragment_deviation = 0; + deviation_ts = 0; hls_cleanup = true; previous_floor_ts = 0; accept_floor_ts = 0; @@ -269,26 +269,14 @@ double SrsHlsMuxer::duration() return current? current->duration:0; } -double SrsHlsMuxer::deviation() +int SrsHlsMuxer::deviation() { // no floor, no deviation. if (!hls_ts_floor) { return 0; } - return hls_fragment_deviation; -} - -int SrsHlsMuxer::absolute_deviation() -{ - // no floor, no deviation. - if (!hls_ts_floor) { - return 0; - } - - // accept the floor ts for the first piece. - int64_t floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment)); - return (int)(accept_floor_ts - (floor_ts - 1)); + return deviation_ts; } int SrsHlsMuxer::initialize(ISrsHlsHandler* h) @@ -323,9 +311,7 @@ int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, previous_floor_ts = 0; accept_floor_ts = 0; hls_window = window; - // for the first time, we set to -N% of fragment, - // that is, the first piece always smaller. - hls_fragment_deviation = -1 * (fragment * SRS_HLS_FLOOR_STARTUP_PERCENT); + deviation_ts = 0; // generate the m3u8 dir and path. m3u8 = path + "/" + m3u8_file; @@ -412,24 +398,33 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts) ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream); if (hls_ts_floor) { // accept the floor ts for the first piece. - int64_t floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment)); + int64_t current_floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment)); if (!accept_floor_ts) { - accept_floor_ts = floor_ts - 1; + accept_floor_ts = current_floor_ts - 1; } else { accept_floor_ts++; } + // jump when deviation more than 10p + if (accept_floor_ts - current_floor_ts > SRS_JUMP_WHEN_PIECE_DEVIATION) { + srs_warn("hls: jmp for ts deviation, current=%"PRId64", accept=%"PRId64, current_floor_ts, accept_floor_ts); + accept_floor_ts = current_floor_ts - 1; + } + + // when reap ts, adjust the deviation. + deviation_ts = (int)(accept_floor_ts - current_floor_ts); + // we always ensure the piece is increase one by one. std::stringstream ts_floor; ts_floor << accept_floor_ts; ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str()); // dup/jmp detect for ts in floor mode. - if (previous_floor_ts && previous_floor_ts != floor_ts - 1) { - srs_warn("hls: dup or jmp for floor ts, previous=%"PRId64", current=%"PRId64", ts=%s, deviation=%.2f", - previous_floor_ts, floor_ts, ts_file.c_str(), hls_fragment_deviation); + if (previous_floor_ts && previous_floor_ts != current_floor_ts - 1) { + srs_warn("hls: dup or jmp for floor ts, previous=%"PRId64", current=%"PRId64", accept=%"PRId64", deviation=%d", + previous_floor_ts, current_floor_ts, accept_floor_ts, deviation_ts); } - previous_floor_ts = floor_ts; + previous_floor_ts = current_floor_ts; } ts_file = srs_path_build_timestamp(ts_file); if (true) { @@ -497,7 +492,7 @@ bool SrsHlsMuxer::is_segment_overflow() srs_assert(current); // use N% deviation, to smoother. - double deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * hls_fragment_deviation : 0.0; + double deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * deviation_ts * hls_fragment : 0.0; return current->duration >= hls_fragment + deviation; } @@ -594,19 +589,14 @@ int SrsHlsMuxer::segment_close(string log_desc) if (current->duration * 1000 >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) { segments.push_back(current); - // when reap ts, adjust the deviation. - if (hls_ts_floor) { - hls_fragment_deviation += (double)(hls_fragment - current->duration); - } - // use async to call the http hooks, for it will cause thread switch. if ((ret = async->call(new SrsDvrAsyncCallOnHls(req, current->full_path, current->sequence_no, current->duration))) != ERROR_SUCCESS) { return ret; } - srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64", deviation=%.2f", + srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64, log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration, - current->segment_start_dts, hls_fragment_deviation); + current->segment_start_dts); // notify handler for update ts. srs_assert(current->writer); @@ -1222,9 +1212,9 @@ void SrsHls::hls_show_mux_log() // the run time is not equals to stream time, // @see: https://github.com/winlinvip/simple-rtmp-server/issues/81#issuecomment-48100994 // it's ok. - srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%.2fs/%dp", + srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%dp", pprint->age(), stream_dts, stream_dts / 90, muxer->sequence_no(), muxer->ts_url().c_str(), - muxer->duration(), muxer->deviation(), muxer->absolute_deviation()); + muxer->duration(), muxer->deviation()); } } diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index 485368699..d405c283f 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -199,9 +199,9 @@ private: private: // whether use floor algorithm for timestamp. bool hls_ts_floor; - // the deviation in seconds to adjust the fragment to be more + // the deviation in piece to adjust the fragment to be more // bigger or smaller. - double hls_fragment_deviation; + int deviation_ts; // the previous reap floor timestamp, // used to detect the dup or jmp or ts. int64_t accept_floor_ts; @@ -242,8 +242,7 @@ public: virtual int sequence_no(); virtual std::string ts_url(); virtual double duration(); - virtual double deviation(); - virtual int absolute_deviation(); + virtual int deviation(); public: /** * initialize the hls muxer. From e3c6e52547988e81f15d3857a9c8bb5db3d3b379 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 12:01:45 +0800 Subject: [PATCH 2/2] enhanced on_hls_notify, support HTTP GET when reap ts. --- README.md | 1 + trunk/conf/full.conf | 17 +++++++ trunk/research/api-server/server.py | 18 +++++++- trunk/src/app/srs_app_config.cpp | 13 +++++- trunk/src/app/srs_app_config.hpp | 5 +++ trunk/src/app/srs_app_hls.cpp | 67 ++++++++++++++++++++++++---- trunk/src/app/srs_app_hls.hpp | 18 +++++++- trunk/src/app/srs_app_http_hooks.cpp | 50 +++++++++++++++++++++ trunk/src/app/srs_app_http_hooks.hpp | 7 +++ 9 files changed, 183 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index dc3aa2832..1b7c0a7c2 100755 --- a/README.md +++ b/README.md @@ -562,6 +562,7 @@ Supported operating systems and hardware: ### SRS 2.0 history +* v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts. * v2.0, 2015-04-10, refine the hls deviation for floor algorithm. * v2.0, 2015-04-08, for [#375](https://github.com/winlinvip/simple-rtmp-server/issues/375), fix hls bug, keep cc continous between ts files. 2.0.159. * v2.0, 2015-04-04, for [#304](https://github.com/winlinvip/simple-rtmp-server/issues/304), rewrite annexb mux for ts, refer to apple sample. 2.0.157. diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 491250669..217a7f95c 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -618,6 +618,15 @@ vhost with-hls.srs.com { # for the hls http callback, @see http_hooks.on_hls of vhost hooks.callback.srs.com # @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS#http-callback # @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_EN_DeliveryHLS#http-callback + + # on_hls_notify, never config in here, should config in http_hooks. + # we support the variables to generate the notify url: + # [app], replace with the app. + # [stream], replace with the stream. + # [ts_url], replace with the ts url. + # for the hls http callback, @see http_hooks.on_hls_notify of vhost hooks.callback.srs.com + # @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS#on-hls-notify + # @read https://github.com/winlinvip/simple-rtmp-server/wiki/v2_EN_DeliveryHLS#on-hls-notify } } # the vhost with hls disabled. @@ -768,6 +777,14 @@ vhost hooks.callback.srs.com { # an int value specifies the error code(0 corresponding to success): # 0 on_hls http://127.0.0.1:8085/api/v1/hls http://localhost:8085/api/v1/hls; + # when srs reap a ts file of hls, call this hook, + # used to push file to cdn network, by get the ts file from cdn network. + # so we use HTTP GET and use the variable following: + # [app], replace with the app. + # [stream], replace with the stream. + # [ts_url], replace with the ts url. + # ignore any return data of server. + on_hls_notify http://127.0.0.1:8085/api/v1/hls/[app]/[stream][ts_url]; } } diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index 047718f86..57baeff4b 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -314,10 +314,24 @@ handle the hls requests: hls stream. class RESTHls(object): exposed = True - def GET(self): + ''' + for SRS hook: on_hls_notify + on_hls_notify: + when srs reap a ts file of hls, call this hook, + used to push file to cdn network, by get the ts file from cdn network. + so we use HTTP GET and use the variable following: + [app], replace with the app. + [stream], replace with the stream. + [ts_url], replace with the ts url. + ignore any return data of server. + ''' + def GET(self, *args, **kwargs): enable_crossdomain() - hls = {} + hls = { + "args": args, + "kwargs": kwargs + } return json.dumps(hls) ''' diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f9ef7390f..5b9563fb8 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1499,7 +1499,7 @@ int SrsConfig::check_config() string m = conf->at(j)->name.c_str(); if (m != "enabled" && m != "on_connect" && m != "on_close" && m != "on_publish" && m != "on_unpublish" && m != "on_play" && m != "on_stop" - && m != "on_dvr" && m != "on_hls" + && m != "on_dvr" && m != "on_hls" && m != "on_hls_notify" ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported vhost http_hooks directive %s, ret=%d", m.c_str(), ret); @@ -2429,6 +2429,17 @@ SrsConfDirective* SrsConfig::get_vhost_on_hls(string vhost) return conf->get("on_hls"); } +SrsConfDirective* SrsConfig::get_vhost_on_hls_notify(string vhost) +{ + SrsConfDirective* conf = get_vhost_http_hooks(vhost); + + if (!conf) { + return NULL; + } + + return conf->get("on_hls_notify"); +} + bool SrsConfig::get_bw_check_enabled(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 1761f92b7..d31889bba 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -646,6 +646,11 @@ public: * @return the on_hls callback directive, the args is the url to callback. */ virtual SrsConfDirective* get_vhost_on_hls(std::string vhost); + /** + * get the on_hls_notify callbacks of vhost. + * @return the on_hls_notify callback directive, the args is the url to callback. + */ + virtual SrsConfDirective* get_vhost_on_hls_notify(std::string vhost); // bwct(bandwidth check tool) section public: /** diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 24bd5d843..240ada91c 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -213,9 +213,49 @@ int SrsDvrAsyncCallOnHls::call() string SrsDvrAsyncCallOnHls::to_string() { - std::stringstream ss; - ss << "vhost=" << req->vhost << ", file=" << path; - return ss.str(); + return "on_hls: " + path; +} + +SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(SrsRequest* r, string u) +{ + req = r; + ts_url = u; +} + +SrsDvrAsyncCallOnHlsNotify::~SrsDvrAsyncCallOnHlsNotify() +{ +} + +int SrsDvrAsyncCallOnHlsNotify::call() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_CALLBACK + // http callback for on_hls_notify in config. + if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { + // HTTP: on_hls + SrsConfDirective* on_hls = _srs_config->get_vhost_on_hls_notify(req->vhost); + if (!on_hls) { + srs_info("ignore the empty http callback: on_hls_notify"); + return ret; + } + + for (int i = 0; i < (int)on_hls->args.size(); i++) { + std::string url = on_hls->args.at(i); + if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url)) != ERROR_SUCCESS) { + srs_error("hook client on_hls_notify failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + } + } +#endif + + return ret; +} + +string SrsDvrAsyncCallOnHlsNotify::to_string() +{ + return "on_hls_notify: " + ts_url; } SrsHlsMuxer::SrsHlsMuxer() @@ -414,19 +454,23 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts) // when reap ts, adjust the deviation. deviation_ts = (int)(accept_floor_ts - current_floor_ts); - // we always ensure the piece is increase one by one. - std::stringstream ts_floor; - ts_floor << accept_floor_ts; - ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str()); - // dup/jmp detect for ts in floor mode. if (previous_floor_ts && previous_floor_ts != current_floor_ts - 1) { srs_warn("hls: dup or jmp for floor ts, previous=%"PRId64", current=%"PRId64", accept=%"PRId64", deviation=%d", previous_floor_ts, current_floor_ts, accept_floor_ts, deviation_ts); } previous_floor_ts = current_floor_ts; + + // we always ensure the piece is increase one by one. + std::stringstream ts_floor; + ts_floor << accept_floor_ts; + ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str()); + + // TODO: FIMXE: we must use the accept ts floor time to generate the hour variable. + ts_file = srs_path_build_timestamp(ts_file); + } else { + ts_file = srs_path_build_timestamp(ts_file); } - ts_file = srs_path_build_timestamp(ts_file); if (true) { std::stringstream ss; ss << current->sequence_no; @@ -593,6 +637,11 @@ int SrsHlsMuxer::segment_close(string log_desc) if ((ret = async->call(new SrsDvrAsyncCallOnHls(req, current->full_path, current->sequence_no, current->duration))) != ERROR_SUCCESS) { return ret; } + + // use async to call the http hooks, for it will cause thread switch. + if ((ret = async->call(new SrsDvrAsyncCallOnHlsNotify(req, current->uri))) != ERROR_SUCCESS) { + return ret; + } srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64, log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration, diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index d405c283f..2ef8861c6 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -157,7 +157,7 @@ public: }; /** - * the dvr async call. + * the hls async call: on_hls */ class SrsDvrAsyncCallOnHls : public ISrsDvrAsyncCall { @@ -174,6 +174,22 @@ public: virtual std::string to_string(); }; +/** + * the hls async call: on_hls_notify + */ +class SrsDvrAsyncCallOnHlsNotify : public ISrsDvrAsyncCall +{ +private: + std::string ts_url; + SrsRequest* req; +public: + SrsDvrAsyncCallOnHlsNotify(SrsRequest* r, std::string u); + virtual ~SrsDvrAsyncCallOnHlsNotify(); +public: + virtual int call(); + virtual std::string to_string(); +}; + /** * muxer the HLS stream(m3u8 and ts files). * generally, the m3u8 muxer only provides methods to open/close segments, diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 6c717a29e..e4bb70461 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -37,6 +37,7 @@ using namespace std; #include #include #include +#include #define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS) @@ -325,6 +326,55 @@ int SrsHttpHooks::on_hls(string url, SrsRequest* req, string file, int sn, doubl return ret; } +int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url) +{ + int ret = ERROR_SUCCESS; + + int client_id = _srs_context->get_id(); + std::string cwd = _srs_config->cwd(); + + if (srs_string_starts_with(ts_url, "http://") || srs_string_starts_with(ts_url, "https://")) { + url = ts_url; + } + + url = srs_string_replace(url, "[app]", req->app); + url = srs_string_replace(url, "[stream]", req->stream); + url = srs_string_replace(url, "[ts_url]", ts_url); + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http: post failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + + SrsHttpClient http; + if ((ret = http.initialize(uri.get_host(), uri.get_port())) != ERROR_SUCCESS) { + return ret; + } + + SrsHttpMessage* msg = NULL; + if ((ret = http.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) { + return ret; + } + SrsAutoFree(SrsHttpMessage, msg); + + ISrsHttpResponseReader* br = msg->body_reader(); + while (!br->eof()) { + std::string data; + if ((ret = br->read(data)) != ERROR_SUCCESS) { + break; + } + } + + srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, ret=%d", + client_id, url.c_str(), msg->status_code(), ret); + + // ignore any error for on_hls_notify. + ret = ERROR_SUCCESS; + + return ret; +} + int SrsHttpHooks::do_post(std::string url, std::string req, int& code, string& res) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 70a870f0a..1a63bdce0 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -105,6 +105,13 @@ public: * @param duration the segment duration in seconds. */ static int on_hls(std::string url, SrsRequest* req, std::string file, int sn, double duration); + /** + * when hls reap segment, callback. + * @param url the api server url, to process the event. + * ignore if empty. + * @param ts_url the ts uri, used to replace the variable [ts_url] in url. + */ + static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url); private: static int do_post(std::string url, std::string req, int& code, std::string& res); };