1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch '2.0release' into develop

This commit is contained in:
winlin 2015-04-10 12:01:52 +08:00
commit 17c1423f4a
9 changed files with 210 additions and 50 deletions

View file

@ -1499,7 +1499,7 @@ int SrsConfig::check_config()
string m = conf->at(j)->name.c_str();
if (m != "enabled" && m != "on_connect" && m != "on_close" && m != "on_publish"
&& m != "on_unpublish" && m != "on_play" && m != "on_stop"
&& m != "on_dvr" && m != "on_hls"
&& m != "on_dvr" && m != "on_hls" && m != "on_hls_notify"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost http_hooks directive %s, ret=%d", m.c_str(), ret);
@ -2429,6 +2429,17 @@ SrsConfDirective* SrsConfig::get_vhost_on_hls(string vhost)
return conf->get("on_hls");
}
SrsConfDirective* SrsConfig::get_vhost_on_hls_notify(string vhost)
{
SrsConfDirective* conf = get_vhost_http_hooks(vhost);
if (!conf) {
return NULL;
}
return conf->get("on_hls_notify");
}
bool SrsConfig::get_bw_check_enabled(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

View file

@ -646,6 +646,11 @@ public:
* @return the on_hls callback directive, the args is the url to callback.
*/
virtual SrsConfDirective* get_vhost_on_hls(std::string vhost);
/**
* get the on_hls_notify callbacks of vhost.
* @return the on_hls_notify callback directive, the args is the url to callback.
*/
virtual SrsConfDirective* get_vhost_on_hls_notify(std::string vhost);
// bwct(bandwidth check tool) section
public:
/**

View file

@ -59,10 +59,10 @@ using namespace std;
// drop the segment when duration of ts too small.
#define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100
// startup piece, the first piece, fragment percent to reap.
#define SRS_HLS_FLOOR_STARTUP_PERCENT 0.1
// fragment plus the deviation percent.
#define SRS_HLS_FLOOR_REAP_PERCENT 0.2
// reset the piece id when deviation overflow this.
#define SRS_JUMP_WHEN_PIECE_DEVIATION 10
ISrsHlsHandler::ISrsHlsHandler()
{
@ -213,9 +213,49 @@ int SrsDvrAsyncCallOnHls::call()
string SrsDvrAsyncCallOnHls::to_string()
{
std::stringstream ss;
ss << "vhost=" << req->vhost << ", file=" << path;
return ss.str();
return "on_hls: " + path;
}
SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(SrsRequest* r, string u)
{
req = r;
ts_url = u;
}
SrsDvrAsyncCallOnHlsNotify::~SrsDvrAsyncCallOnHlsNotify()
{
}
int SrsDvrAsyncCallOnHlsNotify::call()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK
// http callback for on_hls_notify in config.
if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
// HTTP: on_hls
SrsConfDirective* on_hls = _srs_config->get_vhost_on_hls_notify(req->vhost);
if (!on_hls) {
srs_info("ignore the empty http callback: on_hls_notify");
return ret;
}
for (int i = 0; i < (int)on_hls->args.size(); i++) {
std::string url = on_hls->args.at(i);
if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url)) != ERROR_SUCCESS) {
srs_error("hook client on_hls_notify failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
}
}
#endif
return ret;
}
string SrsDvrAsyncCallOnHlsNotify::to_string()
{
return "on_hls_notify: " + ts_url;
}
SrsHlsMuxer::SrsHlsMuxer()
@ -224,7 +264,7 @@ SrsHlsMuxer::SrsHlsMuxer()
handler = NULL;
hls_fragment = hls_window = 0;
hls_aof_ratio = 1.0;
hls_fragment_deviation = 0;
deviation_ts = 0;
hls_cleanup = true;
previous_floor_ts = 0;
accept_floor_ts = 0;
@ -269,26 +309,14 @@ double SrsHlsMuxer::duration()
return current? current->duration:0;
}
double SrsHlsMuxer::deviation()
int SrsHlsMuxer::deviation()
{
// no floor, no deviation.
if (!hls_ts_floor) {
return 0;
}
return hls_fragment_deviation;
}
int SrsHlsMuxer::absolute_deviation()
{
// no floor, no deviation.
if (!hls_ts_floor) {
return 0;
}
// accept the floor ts for the first piece.
int64_t floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment));
return (int)(accept_floor_ts - (floor_ts - 1));
return deviation_ts;
}
int SrsHlsMuxer::initialize(ISrsHlsHandler* h)
@ -323,9 +351,7 @@ int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
previous_floor_ts = 0;
accept_floor_ts = 0;
hls_window = window;
// for the first time, we set to -N% of fragment,
// that is, the first piece always smaller.
hls_fragment_deviation = -1 * (fragment * SRS_HLS_FLOOR_STARTUP_PERCENT);
deviation_ts = 0;
// generate the m3u8 dir and path.
m3u8 = path + "/" + m3u8_file;
@ -412,26 +438,39 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream);
if (hls_ts_floor) {
// accept the floor ts for the first piece.
int64_t floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment));
int64_t current_floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment));
if (!accept_floor_ts) {
accept_floor_ts = floor_ts - 1;
accept_floor_ts = current_floor_ts - 1;
} else {
accept_floor_ts++;
}
// jump when deviation more than 10p
if (accept_floor_ts - current_floor_ts > SRS_JUMP_WHEN_PIECE_DEVIATION) {
srs_warn("hls: jmp for ts deviation, current=%"PRId64", accept=%"PRId64, current_floor_ts, accept_floor_ts);
accept_floor_ts = current_floor_ts - 1;
}
// when reap ts, adjust the deviation.
deviation_ts = (int)(accept_floor_ts - current_floor_ts);
// dup/jmp detect for ts in floor mode.
if (previous_floor_ts && previous_floor_ts != current_floor_ts - 1) {
srs_warn("hls: dup or jmp for floor ts, previous=%"PRId64", current=%"PRId64", accept=%"PRId64", deviation=%d",
previous_floor_ts, current_floor_ts, accept_floor_ts, deviation_ts);
}
previous_floor_ts = current_floor_ts;
// we always ensure the piece is increase one by one.
std::stringstream ts_floor;
ts_floor << accept_floor_ts;
ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str());
// dup/jmp detect for ts in floor mode.
if (previous_floor_ts && previous_floor_ts != floor_ts - 1) {
srs_warn("hls: dup or jmp for floor ts, previous=%"PRId64", current=%"PRId64", ts=%s, deviation=%.2f",
previous_floor_ts, floor_ts, ts_file.c_str(), hls_fragment_deviation);
}
previous_floor_ts = floor_ts;
// TODO: FIMXE: we must use the accept ts floor time to generate the hour variable.
ts_file = srs_path_build_timestamp(ts_file);
} else {
ts_file = srs_path_build_timestamp(ts_file);
}
ts_file = srs_path_build_timestamp(ts_file);
if (true) {
std::stringstream ss;
ss << current->sequence_no;
@ -497,7 +536,7 @@ bool SrsHlsMuxer::is_segment_overflow()
srs_assert(current);
// use N% deviation, to smoother.
double deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * hls_fragment_deviation : 0.0;
double deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * deviation_ts * hls_fragment : 0.0;
return current->duration >= hls_fragment + deviation;
}
@ -594,19 +633,19 @@ int SrsHlsMuxer::segment_close(string log_desc)
if (current->duration * 1000 >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
segments.push_back(current);
// when reap ts, adjust the deviation.
if (hls_ts_floor) {
hls_fragment_deviation += (double)(hls_fragment - current->duration);
}
// use async to call the http hooks, for it will cause thread switch.
if ((ret = async->call(new SrsDvrAsyncCallOnHls(req, current->full_path, current->sequence_no, current->duration))) != ERROR_SUCCESS) {
return ret;
}
// use async to call the http hooks, for it will cause thread switch.
if ((ret = async->call(new SrsDvrAsyncCallOnHlsNotify(req, current->uri))) != ERROR_SUCCESS) {
return ret;
}
srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64", deviation=%.2f",
srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64,
log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration,
current->segment_start_dts, hls_fragment_deviation);
current->segment_start_dts);
// notify handler for update ts.
srs_assert(current->writer);
@ -1222,9 +1261,9 @@ void SrsHls::hls_show_mux_log()
// the run time is not equals to stream time,
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/81#issuecomment-48100994
// it's ok.
srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%.2fs/%dp",
srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%dp",
pprint->age(), stream_dts, stream_dts / 90, muxer->sequence_no(), muxer->ts_url().c_str(),
muxer->duration(), muxer->deviation(), muxer->absolute_deviation());
muxer->duration(), muxer->deviation());
}
}

View file

@ -157,7 +157,7 @@ public:
};
/**
* the dvr async call.
* the hls async call: on_hls
*/
class SrsDvrAsyncCallOnHls : public ISrsDvrAsyncCall
{
@ -174,6 +174,22 @@ public:
virtual std::string to_string();
};
/**
* the hls async call: on_hls_notify
*/
class SrsDvrAsyncCallOnHlsNotify : public ISrsDvrAsyncCall
{
private:
std::string ts_url;
SrsRequest* req;
public:
SrsDvrAsyncCallOnHlsNotify(SrsRequest* r, std::string u);
virtual ~SrsDvrAsyncCallOnHlsNotify();
public:
virtual int call();
virtual std::string to_string();
};
/**
* muxer the HLS stream(m3u8 and ts files).
* generally, the m3u8 muxer only provides methods to open/close segments,
@ -199,9 +215,9 @@ private:
private:
// whether use floor algorithm for timestamp.
bool hls_ts_floor;
// the deviation in seconds to adjust the fragment to be more
// the deviation in piece to adjust the fragment to be more
// bigger or smaller.
double hls_fragment_deviation;
int deviation_ts;
// the previous reap floor timestamp,
// used to detect the dup or jmp or ts.
int64_t accept_floor_ts;
@ -242,8 +258,7 @@ public:
virtual int sequence_no();
virtual std::string ts_url();
virtual double duration();
virtual double deviation();
virtual int absolute_deviation();
virtual int deviation();
public:
/**
* initialize the hls muxer.

View file

@ -37,6 +37,7 @@ using namespace std;
#include <srs_app_http_client.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_utility.hpp>
#define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS)
@ -325,6 +326,55 @@ int SrsHttpHooks::on_hls(string url, SrsRequest* req, string file, int sn, doubl
return ret;
}
int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url)
{
int ret = ERROR_SUCCESS;
int client_id = _srs_context->get_id();
std::string cwd = _srs_config->cwd();
if (srs_string_starts_with(ts_url, "http://") || srs_string_starts_with(ts_url, "https://")) {
url = ts_url;
}
url = srs_string_replace(url, "[app]", req->app);
url = srs_string_replace(url, "[stream]", req->stream);
url = srs_string_replace(url, "[ts_url]", ts_url);
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http: post failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
SrsHttpClient http;
if ((ret = http.initialize(uri.get_host(), uri.get_port())) != ERROR_SUCCESS) {
return ret;
}
SrsHttpMessage* msg = NULL;
if ((ret = http.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) {
return ret;
}
SrsAutoFree(SrsHttpMessage, msg);
ISrsHttpResponseReader* br = msg->body_reader();
while (!br->eof()) {
std::string data;
if ((ret = br->read(data)) != ERROR_SUCCESS) {
break;
}
}
srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, ret=%d",
client_id, url.c_str(), msg->status_code(), ret);
// ignore any error for on_hls_notify.
ret = ERROR_SUCCESS;
return ret;
}
int SrsHttpHooks::do_post(std::string url, std::string req, int& code, string& res)
{
int ret = ERROR_SUCCESS;

View file

@ -105,6 +105,13 @@ public:
* @param duration the segment duration in seconds.
*/
static int on_hls(std::string url, SrsRequest* req, std::string file, int sn, double duration);
/**
* when hls reap segment, callback.
* @param url the api server url, to process the event.
* ignore if empty.
* @param ts_url the ts uri, used to replace the variable [ts_url] in url.
*/
static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url);
private:
static int do_post(std::string url, std::string req, int& code, std::string& res);
};