1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

HLS: Rebuild m3u8 to make ts with id, for stat.

This commit is contained in:
winlin 2022-08-29 09:07:34 +08:00
parent bc569d91a0
commit c1df280211
5 changed files with 161 additions and 76 deletions

View file

@ -41,6 +41,16 @@ using namespace std;
#define SRS_CONTEXT_IN_HLS "hls_ctx"
SrsM3u8CtxInfo::SrsM3u8CtxInfo()
{
req = NULL;
}
SrsM3u8CtxInfo::~SrsM3u8CtxInfo()
{
srs_freep(req);
}
SrsHlsStream::SrsHlsStream()
{
_srs_hybrid->timer5s()->subscribe(this);
@ -50,42 +60,40 @@ SrsHlsStream::~SrsHlsStream()
{
_srs_hybrid->timer5s()->unsubscribe(this);
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
std::map<std::string, SrsM3u8CtxInfo*>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {
srs_freep(it->second.req);
SrsM3u8CtxInfo* info = it->second;
srs_freep(info);
}
map_ctx_info_.clear();
}
srs_error_t SrsHlsStream::serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, bool* served)
srs_error_t SrsHlsStream::serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, SrsRequest* req)
{
srs_error_t err = srs_success;
string ctx = r->query_get(SRS_CONTEXT_IN_HLS);
// Always make the ctx alive now.
alive(ctx, req);
// Already exists context, response with rebuilt m3u8 content.
if (!ctx.empty() && ctx_is_exist(ctx)) {
return serve_exists_session(w, r, fullpath);
}
// Create a m3u8 in memory, contains the session id(ctx).
return serve_new_session(w, r, req);
}
srs_error_t SrsHlsStream::serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRequest* req)
{
srs_error_t err = srs_success;
SrsHttpMessage *hr = dynamic_cast<SrsHttpMessage *>(r);
srs_assert(hr);
SrsRequest* req = hr->to_request(hr->host())->as_http();
SrsAutoFree(SrsRequest, req);
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
if (parsed_vhost) {
req->vhost = parsed_vhost->arg0();
}
// If HLS stream is disabled, use SrsHttpFileServer to serve HLS, which is normal file server.
if (!_srs_config->get_hls_ctx_enabled(req->vhost)) {
return err;
}
// Serve as HLS stream, create a HLS session to serve it.
string ctx = hr->query_get(SRS_CONTEXT_IN_HLS);
if (!ctx.empty() && ctx_is_exist(ctx)) {
alive(ctx, NULL);
return err;
}
// Create a m3u8 in memory, contains the session id(ctx).
string ctx;
if (ctx.empty()) {
// make sure unique
do {
@ -123,16 +131,58 @@ srs_error_t SrsHlsStream::serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMess
return srs_error_wrap(err, "final request");
}
alive(ctx, req->copy());
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsHlsPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
// The request has been served by HLS streaming handler.
*served = true;
return err;
}
srs_error_t SrsHlsStream::serve_exists_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath)
{
srs_error_t err = srs_success;
// Read m3u8 content.
SrsFileReader fs;
if ((err = fs.open(fullpath)) != srs_success) {
return srs_error_wrap(err, "open %s", fullpath.c_str());
}
string content;
if ((err = srs_ioutil_read_all(&fs, content)) != srs_success) {
return srs_error_wrap(err, "read %s", fullpath.c_str());
}
// Rebuild the m3u8 content, make .ts with hls_ctx.
size_t pos_ts = content.find(".ts");
static string QUERY_PREFIX = string(".ts?") + string(SRS_CONTEXT_IN_HLS) + string("=");
if (pos_ts != string::npos) {
string ctx = r->query_get(SRS_CONTEXT_IN_HLS);
string query = QUERY_PREFIX + ctx;
size_t pos_query = content.find(".ts?");
if (pos_query != string::npos) {
query += "&";
content = srs_string_replace(content, ".ts?", query);
} else {
content = srs_string_replace(content, ".ts", query);
}
}
// Response with rebuilt content.
w->header()->set_content_type("application/vnd.apple.mpegurl");
w->header()->set_content_length(content.length());
w->write_header(SRS_CONSTS_HTTP_OK);
if (!content.empty()) {
w->write((char*)content.data(), content.length());
}
if ((err = w->final_request()) != srs_success) {
return srs_error_wrap(err, "final request");
}
return err;
}
@ -144,15 +194,20 @@ bool SrsHlsStream::ctx_is_exist(std::string ctx)
void SrsHlsStream::alive(std::string ctx, SrsRequest* req)
{
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
if ((it = map_ctx_info_.find(ctx)) != map_ctx_info_.end()) {
it->second.request_time = srs_get_system_time();
} else {
SrsM3u8CtxInfo info;
info.req = req;
info.request_time = srs_get_system_time();
std::map<std::string, SrsM3u8CtxInfo*>::iterator it = map_ctx_info_.find(ctx);
// Create new context.
if (it == map_ctx_info_.end()) {
SrsM3u8CtxInfo *info = new SrsM3u8CtxInfo();
info->req = req->copy();
info->request_time = srs_get_system_time();
map_ctx_info_.insert(make_pair(ctx, info));
return;
}
// Update alive time of context.
SrsM3u8CtxInfo* info = it->second;
info->request_time = srs_get_system_time();
}
srs_error_t SrsHlsStream::http_hooks_on_play(SrsRequest* req)
@ -222,21 +277,23 @@ srs_error_t SrsHlsStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
std::map<std::string, SrsM3u8CtxInfo>::iterator it;
std::map<std::string, SrsM3u8CtxInfo*>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {
string ctx = it->first;
SrsRequest* req = it->second.req;
srs_utime_t hls_window = _srs_config->get_hls_window(req->vhost);
if (it->second.request_time + (2 * hls_window) < srs_get_system_time()) {
SrsM3u8CtxInfo* info = it->second;
srs_utime_t hls_window = _srs_config->get_hls_window(info->req->vhost);
if (info->request_time + (2 * hls_window) < srs_get_system_time()) {
SrsContextRestore(_srs_context->get_id());
_srs_context->set_id(SrsContextId().set_value(ctx));
http_hooks_on_stop(req);
srs_freep(req);
http_hooks_on_stop(info->req);
SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(ctx);
map_ctx_info_.erase(it);
srs_freep(info);
break;
}
@ -388,21 +445,28 @@ srs_error_t SrsVodStream::serve_m3u8_ctx(ISrsHttpResponseWriter * w, ISrsHttpMes
{
srs_error_t err = srs_success;
// Try to serve by HLS streaming.
bool served = false;
if ((err = hls_.serve_m3u8_ctx(w, r, &served)) != srs_success) {
return srs_error_wrap(err, "hls stream");
}
// Done if already served.
if (served) {
return err;
SrsHttpMessage *hr = dynamic_cast<SrsHttpMessage *>(r);
srs_assert(hr);
SrsRequest *req = hr->to_request(hr->host())->as_http();
SrsAutoFree(SrsRequest, req);
// discovery vhost, resolve the vhost from config
SrsConfDirective *parsed_vhost = _srs_config->get_vhost(req->vhost);
if (parsed_vhost) {
req->vhost = parsed_vhost->arg0();
}
// If HLS stream is disabled, use SrsHttpFileServer to serve HLS, which is normal file server.
if (!_srs_config->get_hls_ctx_enabled(req->vhost)) {
// Serve by default HLS handler.
return SrsHttpFileServer::serve_m3u8_ctx(w, r, fullpath);
}
// Try to serve by HLS streaming.
return hls_.serve_m3u8_ctx(w, r, fullpath, req);
}
SrsHttpStaticServer::SrsHttpStaticServer(SrsServer* svr)
{
server = svr;

View file

@ -15,6 +15,8 @@ struct SrsM3u8CtxInfo
{
srs_utime_t request_time;
SrsRequest* req;
SrsM3u8CtxInfo();
virtual ~SrsM3u8CtxInfo();
};
// Server HLS streaming.
@ -22,17 +24,19 @@ class SrsHlsStream : public ISrsFastTimer
{
private:
// The period of validity of the ctx
std::map<std::string, SrsM3u8CtxInfo> map_ctx_info_;
std::map<std::string, SrsM3u8CtxInfo*> map_ctx_info_;
public:
SrsHlsStream();
virtual ~SrsHlsStream();
public:
virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, bool* served);
virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, SrsRequest* req);
private:
virtual bool ctx_is_exist(std::string ctx);
virtual void alive(std::string ctx, SrsRequest* req);
virtual srs_error_t http_hooks_on_play(SrsRequest* req);
virtual void http_hooks_on_stop(SrsRequest* req);
srs_error_t serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRequest *req);
srs_error_t serve_exists_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath);
bool ctx_is_exist(std::string ctx);
void alive(std::string ctx, SrsRequest* req);
srs_error_t http_hooks_on_play(SrsRequest* req);
void http_hooks_on_stop(SrsRequest* req);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);

View file

@ -574,25 +574,7 @@ std::string SrsHttpMessage::parse_rest_id(string pattern)
srs_error_t SrsHttpMessage::body_read_all(string& body)
{
srs_error_t err = srs_success;
// cache to read.
char* buf = new char[SRS_HTTP_READ_CACHE_BYTES];
SrsAutoFreeA(char, buf);
// whatever, read util EOF.
while (!_body->eof()) {
ssize_t nb_read = 0;
if ((err = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != srs_success) {
return srs_error_wrap(err, "read body");
}
if (nb_read > 0) {
body.append(buf, nb_read);
}
}
return err;
return srs_ioutil_read_all(_body, body);
}
ISrsHttpResponseReader* SrsHttpMessage::body_reader()

View file

@ -41,6 +41,7 @@ using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_core_autofree.hpp>
void srs_discovery_tc_url(string tcUrl, string& schema, string& host, string& vhost, string& app, string& stream, int& port, string& param)
{
@ -900,3 +901,33 @@ string srs_get_system_hostname()
return _srs_system_hostname;
}
srs_error_t srs_ioutil_read_all(ISrsReader* in, std::string& content)
{
srs_error_t err = srs_success;
// Cache to read, it might cause coroutine switch, so we use local cache here.
char* buf = new char[SRS_HTTP_READ_CACHE_BYTES];
SrsAutoFreeA(char, buf);
// Whatever, read util EOF.
while (true) {
ssize_t nb_read = 0;
if ((err = in->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != srs_success) {
int code = srs_error_code(err);
if (code == ERROR_SYSTEM_FILE_EOF || code == ERROR_HTTP_RESPONSE_EOF || code == ERROR_HTTP_REQUEST_EOF
|| code == ERROR_HTTP_STREAM_EOF
) {
srs_freep(err);
return err;
}
return srs_error_wrap(err, "read body");
}
if (nb_read > 0) {
content.append(buf, nb_read);
}
}
return err;
}

View file

@ -32,6 +32,7 @@ class SrsMessageHeader;
class SrsSharedPtrMessage;
class SrsCommonMessage;
class ISrsProtocolReadWriter;
class ISrsReader;
/**
* parse the tcUrl, output the schema, host, vhost, app and port.
@ -183,5 +184,8 @@ extern std::string srs_get_original_ip(ISrsHttpMessage* r);
// Get hostname
extern std::string srs_get_system_hostname(void);
// Read all content util EOF.
extern srs_error_t srs_ioutil_read_all(ISrsReader* in, std::string& content);
#endif