1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

enhanced on_hls_notify, support HTTP GET when reap ts.

This commit is contained in:
winlin 2015-04-10 12:01:45 +08:00
parent 7dbc95e085
commit e3c6e52547
9 changed files with 183 additions and 13 deletions

View file

@ -1499,7 +1499,7 @@ int SrsConfig::check_config()
string m = conf->at(j)->name.c_str();
if (m != "enabled" && m != "on_connect" && m != "on_close" && m != "on_publish"
&& m != "on_unpublish" && m != "on_play" && m != "on_stop"
&& m != "on_dvr" && m != "on_hls"
&& m != "on_dvr" && m != "on_hls" && m != "on_hls_notify"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost http_hooks directive %s, ret=%d", m.c_str(), ret);
@ -2429,6 +2429,17 @@ SrsConfDirective* SrsConfig::get_vhost_on_hls(string vhost)
return conf->get("on_hls");
}
SrsConfDirective* SrsConfig::get_vhost_on_hls_notify(string vhost)
{
SrsConfDirective* conf = get_vhost_http_hooks(vhost);
if (!conf) {
return NULL;
}
return conf->get("on_hls_notify");
}
bool SrsConfig::get_bw_check_enabled(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

View file

@ -646,6 +646,11 @@ public:
* @return the on_hls callback directive, the args is the url to callback.
*/
virtual SrsConfDirective* get_vhost_on_hls(std::string vhost);
/**
* get the on_hls_notify callbacks of vhost.
* @return the on_hls_notify callback directive, the args is the url to callback.
*/
virtual SrsConfDirective* get_vhost_on_hls_notify(std::string vhost);
// bwct(bandwidth check tool) section
public:
/**

View file

@ -213,9 +213,49 @@ int SrsDvrAsyncCallOnHls::call()
string SrsDvrAsyncCallOnHls::to_string()
{
std::stringstream ss;
ss << "vhost=" << req->vhost << ", file=" << path;
return ss.str();
return "on_hls: " + path;
}
SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(SrsRequest* r, string u)
{
req = r;
ts_url = u;
}
SrsDvrAsyncCallOnHlsNotify::~SrsDvrAsyncCallOnHlsNotify()
{
}
int SrsDvrAsyncCallOnHlsNotify::call()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK
// http callback for on_hls_notify in config.
if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
// HTTP: on_hls
SrsConfDirective* on_hls = _srs_config->get_vhost_on_hls_notify(req->vhost);
if (!on_hls) {
srs_info("ignore the empty http callback: on_hls_notify");
return ret;
}
for (int i = 0; i < (int)on_hls->args.size(); i++) {
std::string url = on_hls->args.at(i);
if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url)) != ERROR_SUCCESS) {
srs_error("hook client on_hls_notify failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
}
}
#endif
return ret;
}
string SrsDvrAsyncCallOnHlsNotify::to_string()
{
return "on_hls_notify: " + ts_url;
}
SrsHlsMuxer::SrsHlsMuxer()
@ -414,19 +454,23 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
// when reap ts, adjust the deviation.
deviation_ts = (int)(accept_floor_ts - current_floor_ts);
// we always ensure the piece is increase one by one.
std::stringstream ts_floor;
ts_floor << accept_floor_ts;
ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str());
// dup/jmp detect for ts in floor mode.
if (previous_floor_ts && previous_floor_ts != current_floor_ts - 1) {
srs_warn("hls: dup or jmp for floor ts, previous=%"PRId64", current=%"PRId64", accept=%"PRId64", deviation=%d",
previous_floor_ts, current_floor_ts, accept_floor_ts, deviation_ts);
}
previous_floor_ts = current_floor_ts;
// we always ensure the piece is increase one by one.
std::stringstream ts_floor;
ts_floor << accept_floor_ts;
ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str());
// TODO: FIMXE: we must use the accept ts floor time to generate the hour variable.
ts_file = srs_path_build_timestamp(ts_file);
} else {
ts_file = srs_path_build_timestamp(ts_file);
}
ts_file = srs_path_build_timestamp(ts_file);
if (true) {
std::stringstream ss;
ss << current->sequence_no;
@ -593,6 +637,11 @@ int SrsHlsMuxer::segment_close(string log_desc)
if ((ret = async->call(new SrsDvrAsyncCallOnHls(req, current->full_path, current->sequence_no, current->duration))) != ERROR_SUCCESS) {
return ret;
}
// use async to call the http hooks, for it will cause thread switch.
if ((ret = async->call(new SrsDvrAsyncCallOnHlsNotify(req, current->uri))) != ERROR_SUCCESS) {
return ret;
}
srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64,
log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration,

View file

@ -157,7 +157,7 @@ public:
};
/**
* the dvr async call.
* the hls async call: on_hls
*/
class SrsDvrAsyncCallOnHls : public ISrsDvrAsyncCall
{
@ -174,6 +174,22 @@ public:
virtual std::string to_string();
};
/**
* the hls async call: on_hls_notify
*/
class SrsDvrAsyncCallOnHlsNotify : public ISrsDvrAsyncCall
{
private:
std::string ts_url;
SrsRequest* req;
public:
SrsDvrAsyncCallOnHlsNotify(SrsRequest* r, std::string u);
virtual ~SrsDvrAsyncCallOnHlsNotify();
public:
virtual int call();
virtual std::string to_string();
};
/**
* muxer the HLS stream(m3u8 and ts files).
* generally, the m3u8 muxer only provides methods to open/close segments,

View file

@ -37,6 +37,7 @@ using namespace std;
#include <srs_app_http_client.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_utility.hpp>
#define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS)
@ -325,6 +326,55 @@ int SrsHttpHooks::on_hls(string url, SrsRequest* req, string file, int sn, doubl
return ret;
}
int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url)
{
int ret = ERROR_SUCCESS;
int client_id = _srs_context->get_id();
std::string cwd = _srs_config->cwd();
if (srs_string_starts_with(ts_url, "http://") || srs_string_starts_with(ts_url, "https://")) {
url = ts_url;
}
url = srs_string_replace(url, "[app]", req->app);
url = srs_string_replace(url, "[stream]", req->stream);
url = srs_string_replace(url, "[ts_url]", ts_url);
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http: post failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
SrsHttpClient http;
if ((ret = http.initialize(uri.get_host(), uri.get_port())) != ERROR_SUCCESS) {
return ret;
}
SrsHttpMessage* msg = NULL;
if ((ret = http.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) {
return ret;
}
SrsAutoFree(SrsHttpMessage, msg);
ISrsHttpResponseReader* br = msg->body_reader();
while (!br->eof()) {
std::string data;
if ((ret = br->read(data)) != ERROR_SUCCESS) {
break;
}
}
srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, ret=%d",
client_id, url.c_str(), msg->status_code(), ret);
// ignore any error for on_hls_notify.
ret = ERROR_SUCCESS;
return ret;
}
int SrsHttpHooks::do_post(std::string url, std::string req, int& code, string& res)
{
int ret = ERROR_SUCCESS;

View file

@ -105,6 +105,13 @@ public:
* @param duration the segment duration in seconds.
*/
static int on_hls(std::string url, SrsRequest* req, std::string file, int sn, double duration);
/**
* when hls reap segment, callback.
* @param url the api server url, to process the event.
* ignore if empty.
* @param ts_url the ts uri, used to replace the variable [ts_url] in url.
*/
static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url);
private:
static int do_post(std::string url, std::string req, int& code, std::string& res);
};