diff --git a/trunk/src/app/srs_app_http_static.cpp b/trunk/src/app/srs_app_http_static.cpp index 1733ca298..aae20d891 100644 --- a/trunk/src/app/srs_app_http_static.cpp +++ b/trunk/src/app/srs_app_http_static.cpp @@ -41,6 +41,16 @@ using namespace std; #define SRS_CONTEXT_IN_HLS "hls_ctx" +SrsM3u8CtxInfo::SrsM3u8CtxInfo() +{ + req = NULL; +} + +SrsM3u8CtxInfo::~SrsM3u8CtxInfo() +{ + srs_freep(req); +} + SrsHlsStream::SrsHlsStream() { _srs_hybrid->timer5s()->subscribe(this); @@ -50,42 +60,40 @@ SrsHlsStream::~SrsHlsStream() { _srs_hybrid->timer5s()->unsubscribe(this); - std::map::iterator it; + std::map::iterator it; for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) { - srs_freep(it->second.req); + SrsM3u8CtxInfo* info = it->second; + srs_freep(info); } map_ctx_info_.clear(); } -srs_error_t SrsHlsStream::serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, bool* served) +srs_error_t SrsHlsStream::serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, SrsRequest* req) { srs_error_t err = srs_success; - SrsHttpMessage* hr = dynamic_cast(r); - srs_assert(hr); + string ctx = r->query_get(SRS_CONTEXT_IN_HLS); - SrsRequest* req = hr->to_request(hr->host())->as_http(); - SrsAutoFree(SrsRequest, req); + // Always make the ctx alive now. + alive(ctx, req); - // discovery vhost, resolve the vhost from config - SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost); - if (parsed_vhost) { - req->vhost = parsed_vhost->arg0(); - } - - // If HLS stream is disabled, use SrsHttpFileServer to serve HLS, which is normal file server. - if (!_srs_config->get_hls_ctx_enabled(req->vhost)) { - return err; - } - - // Serve as HLS stream, create a HLS session to serve it. - string ctx = hr->query_get(SRS_CONTEXT_IN_HLS); + // Already exists context, response with rebuilt m3u8 content. if (!ctx.empty() && ctx_is_exist(ctx)) { - alive(ctx, NULL); - return err; + return serve_exists_session(w, r, fullpath); } // Create a m3u8 in memory, contains the session id(ctx). + return serve_new_session(w, r, req); +} + +srs_error_t SrsHlsStream::serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRequest* req) +{ + srs_error_t err = srs_success; + + SrsHttpMessage *hr = dynamic_cast(r); + srs_assert(hr); + + string ctx; if (ctx.empty()) { // make sure unique do { @@ -123,16 +131,58 @@ srs_error_t SrsHlsStream::serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMess return srs_error_wrap(err, "final request"); } - alive(ctx, req->copy()); - // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((err = stat->on_client(ctx, req, NULL, SrsHlsPlay)) != srs_success) { return srs_error_wrap(err, "stat on client"); } - // The request has been served by HLS streaming handler. - *served = true; + return err; +} + +srs_error_t SrsHlsStream::serve_exists_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath) +{ + srs_error_t err = srs_success; + + // Read m3u8 content. + SrsFileReader fs; + if ((err = fs.open(fullpath)) != srs_success) { + return srs_error_wrap(err, "open %s", fullpath.c_str()); + } + + string content; + if ((err = srs_ioutil_read_all(&fs, content)) != srs_success) { + return srs_error_wrap(err, "read %s", fullpath.c_str()); + } + + // Rebuild the m3u8 content, make .ts with hls_ctx. + size_t pos_ts = content.find(".ts"); + static string QUERY_PREFIX = string(".ts?") + string(SRS_CONTEXT_IN_HLS) + string("="); + + if (pos_ts != string::npos) { + string ctx = r->query_get(SRS_CONTEXT_IN_HLS); + string query = QUERY_PREFIX + ctx; + + size_t pos_query = content.find(".ts?"); + if (pos_query != string::npos) { + query += "&"; + content = srs_string_replace(content, ".ts?", query); + } else { + content = srs_string_replace(content, ".ts", query); + } + } + + // Response with rebuilt content. + w->header()->set_content_type("application/vnd.apple.mpegurl"); + w->header()->set_content_length(content.length()); + w->write_header(SRS_CONSTS_HTTP_OK); + if (!content.empty()) { + w->write((char*)content.data(), content.length()); + } + + if ((err = w->final_request()) != srs_success) { + return srs_error_wrap(err, "final request"); + } return err; } @@ -144,15 +194,20 @@ bool SrsHlsStream::ctx_is_exist(std::string ctx) void SrsHlsStream::alive(std::string ctx, SrsRequest* req) { - std::map::iterator it; - if ((it = map_ctx_info_.find(ctx)) != map_ctx_info_.end()) { - it->second.request_time = srs_get_system_time(); - } else { - SrsM3u8CtxInfo info; - info.req = req; - info.request_time = srs_get_system_time(); + std::map::iterator it = map_ctx_info_.find(ctx); + + // Create new context. + if (it == map_ctx_info_.end()) { + SrsM3u8CtxInfo *info = new SrsM3u8CtxInfo(); + info->req = req->copy(); + info->request_time = srs_get_system_time(); map_ctx_info_.insert(make_pair(ctx, info)); + return; } + + // Update alive time of context. + SrsM3u8CtxInfo* info = it->second; + info->request_time = srs_get_system_time(); } srs_error_t SrsHlsStream::http_hooks_on_play(SrsRequest* req) @@ -222,21 +277,23 @@ srs_error_t SrsHlsStream::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; - std::map::iterator it; + std::map::iterator it; for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) { string ctx = it->first; - SrsRequest* req = it->second.req; - srs_utime_t hls_window = _srs_config->get_hls_window(req->vhost); - if (it->second.request_time + (2 * hls_window) < srs_get_system_time()) { + SrsM3u8CtxInfo* info = it->second; + + srs_utime_t hls_window = _srs_config->get_hls_window(info->req->vhost); + if (info->request_time + (2 * hls_window) < srs_get_system_time()) { SrsContextRestore(_srs_context->get_id()); _srs_context->set_id(SrsContextId().set_value(ctx)); - http_hooks_on_stop(req); - srs_freep(req); + http_hooks_on_stop(info->req); SrsStatistic* stat = SrsStatistic::instance(); stat->on_disconnect(ctx); + map_ctx_info_.erase(it); + srs_freep(info); break; } @@ -388,19 +445,26 @@ srs_error_t SrsVodStream::serve_m3u8_ctx(ISrsHttpResponseWriter * w, ISrsHttpMes { srs_error_t err = srs_success; + SrsHttpMessage *hr = dynamic_cast(r); + srs_assert(hr); + + SrsRequest *req = hr->to_request(hr->host())->as_http(); + SrsAutoFree(SrsRequest, req); + + // discovery vhost, resolve the vhost from config + SrsConfDirective *parsed_vhost = _srs_config->get_vhost(req->vhost); + if (parsed_vhost) { + req->vhost = parsed_vhost->arg0(); + } + + // If HLS stream is disabled, use SrsHttpFileServer to serve HLS, which is normal file server. + if (!_srs_config->get_hls_ctx_enabled(req->vhost)) { + // Serve by default HLS handler. + return SrsHttpFileServer::serve_m3u8_ctx(w, r, fullpath); + } + // Try to serve by HLS streaming. - bool served = false; - if ((err = hls_.serve_m3u8_ctx(w, r, &served)) != srs_success) { - return srs_error_wrap(err, "hls stream"); - } - - // Done if already served. - if (served) { - return err; - } - - // Serve by default HLS handler. - return SrsHttpFileServer::serve_m3u8_ctx(w, r, fullpath); + return hls_.serve_m3u8_ctx(w, r, fullpath, req); } SrsHttpStaticServer::SrsHttpStaticServer(SrsServer* svr) diff --git a/trunk/src/app/srs_app_http_static.hpp b/trunk/src/app/srs_app_http_static.hpp index 9476348be..fc9f62147 100644 --- a/trunk/src/app/srs_app_http_static.hpp +++ b/trunk/src/app/srs_app_http_static.hpp @@ -15,6 +15,8 @@ struct SrsM3u8CtxInfo { srs_utime_t request_time; SrsRequest* req; + SrsM3u8CtxInfo(); + virtual ~SrsM3u8CtxInfo(); }; // Server HLS streaming. @@ -22,17 +24,19 @@ class SrsHlsStream : public ISrsFastTimer { private: // The period of validity of the ctx - std::map map_ctx_info_; + std::map map_ctx_info_; public: SrsHlsStream(); virtual ~SrsHlsStream(); public: - virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, bool* served); + virtual srs_error_t serve_m3u8_ctx(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, SrsRequest* req); private: - virtual bool ctx_is_exist(std::string ctx); - virtual void alive(std::string ctx, SrsRequest* req); - virtual srs_error_t http_hooks_on_play(SrsRequest* req); - virtual void http_hooks_on_stop(SrsRequest* req); + srs_error_t serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsRequest *req); + srs_error_t serve_exists_session(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath); + bool ctx_is_exist(std::string ctx); + void alive(std::string ctx, SrsRequest* req); + srs_error_t http_hooks_on_play(SrsRequest* req); + void http_hooks_on_stop(SrsRequest* req); // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval); diff --git a/trunk/src/protocol/srs_protocol_http_conn.cpp b/trunk/src/protocol/srs_protocol_http_conn.cpp index d160b1799..7b151857a 100644 --- a/trunk/src/protocol/srs_protocol_http_conn.cpp +++ b/trunk/src/protocol/srs_protocol_http_conn.cpp @@ -574,25 +574,7 @@ std::string SrsHttpMessage::parse_rest_id(string pattern) srs_error_t SrsHttpMessage::body_read_all(string& body) { - srs_error_t err = srs_success; - - // cache to read. - char* buf = new char[SRS_HTTP_READ_CACHE_BYTES]; - SrsAutoFreeA(char, buf); - - // whatever, read util EOF. - while (!_body->eof()) { - ssize_t nb_read = 0; - if ((err = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != srs_success) { - return srs_error_wrap(err, "read body"); - } - - if (nb_read > 0) { - body.append(buf, nb_read); - } - } - - return err; + return srs_ioutil_read_all(_body, body); } ISrsHttpResponseReader* SrsHttpMessage::body_reader() diff --git a/trunk/src/protocol/srs_protocol_utility.cpp b/trunk/src/protocol/srs_protocol_utility.cpp index 43b254132..31a282ab1 100644 --- a/trunk/src/protocol/srs_protocol_utility.cpp +++ b/trunk/src/protocol/srs_protocol_utility.cpp @@ -41,6 +41,7 @@ using namespace std; #include #include #include +#include void srs_discovery_tc_url(string tcUrl, string& schema, string& host, string& vhost, string& app, string& stream, int& port, string& param) { @@ -900,3 +901,33 @@ string srs_get_system_hostname() return _srs_system_hostname; } +srs_error_t srs_ioutil_read_all(ISrsReader* in, std::string& content) +{ + srs_error_t err = srs_success; + + // Cache to read, it might cause coroutine switch, so we use local cache here. + char* buf = new char[SRS_HTTP_READ_CACHE_BYTES]; + SrsAutoFreeA(char, buf); + + // Whatever, read util EOF. + while (true) { + ssize_t nb_read = 0; + if ((err = in->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != srs_success) { + int code = srs_error_code(err); + if (code == ERROR_SYSTEM_FILE_EOF || code == ERROR_HTTP_RESPONSE_EOF || code == ERROR_HTTP_REQUEST_EOF + || code == ERROR_HTTP_STREAM_EOF + ) { + srs_freep(err); + return err; + } + return srs_error_wrap(err, "read body"); + } + + if (nb_read > 0) { + content.append(buf, nb_read); + } + } + + return err; +} + diff --git a/trunk/src/protocol/srs_protocol_utility.hpp b/trunk/src/protocol/srs_protocol_utility.hpp index 057b5d5b8..8b834b864 100644 --- a/trunk/src/protocol/srs_protocol_utility.hpp +++ b/trunk/src/protocol/srs_protocol_utility.hpp @@ -32,6 +32,7 @@ class SrsMessageHeader; class SrsSharedPtrMessage; class SrsCommonMessage; class ISrsProtocolReadWriter; +class ISrsReader; /** * parse the tcUrl, output the schema, host, vhost, app and port. @@ -183,5 +184,8 @@ extern std::string srs_get_original_ip(ISrsHttpMessage* r); // Get hostname extern std::string srs_get_system_hostname(void); +// Read all content util EOF. +extern srs_error_t srs_ioutil_read_all(ISrsReader* in, std::string& content); + #endif