diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 43f0e701a..5b4728dc5 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -65,7 +65,7 @@ SRS_GCOV=NO # always enable the warn/error level. SRS_LOG_VERBOSE=NO SRS_LOG_INFO=NO -SRS_LOG_TRACE=NO +SRS_LOG_TRACE=YES # ################################################################ # experts @@ -437,11 +437,6 @@ if [ $help = yes ]; then fi function apply_detail_options() { - # always set the log level for all presets. - SRS_LOG_VERBOSE=NO - SRS_LOG_INFO=NO - SRS_LOG_TRACE=YES - # set default preset if not specifies if [[ $SRS_PURE_RTMP == NO && $SRS_FAST == NO && $SRS_DISABLE_ALL == NO && $SRS_ENABLE_ALL == NO && \ $SRS_DEV == NO && $SRS_FAST_DEV == NO && $SRS_DEMO == NO && $SRS_PI == NO && $SRS_CUBIE == NO && \ diff --git a/trunk/configure b/trunk/configure index 605fc2db8..48bf155fc 100755 --- a/trunk/configure +++ b/trunk/configure @@ -84,7 +84,7 @@ END # enable gdb debug GDBDebug=" -g -O0" # the warning level. -WarnLevel=" -Wall" +WarnLevel=" -Wall -Wno-deprecated-declarations" # the compile standard. CppStd="-ansi" if [[ $SRS_CXX11 == YES ]]; then diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index d180c7208..80c1e3a6c 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -201,7 +201,7 @@ srs_error_t SrsConnection::cycle() return srs_success; } -string SrsConnection::srs_id() +SrsContextId SrsConnection::srs_id() { return trd->cid(); } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index f4e8371c9..551976c0a 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -93,7 +93,8 @@ public: virtual srs_error_t cycle(); public: // Get the srs id which identify the client. - virtual std::string srs_id(); + // TODO: FIXME: Rename to cid. + virtual SrsContextId srs_id(); // Get the remote ip of peer. virtual std::string remote_ip(); // Set connection to expired. diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 3dae0d1cc..2665210a1 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -533,7 +533,7 @@ srs_error_t SrsDvrMp4Segmenter::close_encoder() return err; } -SrsDvrAsyncCallOnDvr::SrsDvrAsyncCallOnDvr(std::string c, SrsRequest* r, string p) +SrsDvrAsyncCallOnDvr::SrsDvrAsyncCallOnDvr(SrsContextId c, SrsRequest* r, string p) { cid = c; req = r->copy(); @@ -673,7 +673,7 @@ srs_error_t SrsDvrPlan::on_reap_segment() { srs_error_t err = srs_success; - std::string cid = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsFragment* fragment = segment->current(); string fullpath = fragment->fullpath(); diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index dc1f35560..03b04e746 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -159,11 +159,11 @@ protected: class SrsDvrAsyncCallOnDvr : public ISrsAsyncCallTask { private: - std::string cid; + SrsContextId cid; std::string path; SrsRequest* req; public: - SrsDvrAsyncCallOnDvr(std::string c, SrsRequest* r, std::string p); + SrsDvrAsyncCallOnDvr(SrsContextId c, SrsRequest* r, std::string p); virtual ~SrsDvrAsyncCallOnDvr(); public: virtual srs_error_t call(); diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 65fefd0c4..e5cdfbfc6 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -82,7 +82,7 @@ void SrsHlsSegment::config_cipher(unsigned char* key,unsigned char* iv) fw->config_cipher(key, iv); } -SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(string c, SrsRequest* r, string p, string t, string m, string mu, int s, srs_utime_t d) +SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(SrsContextId c, SrsRequest* r, string p, string t, string m, string mu, int s, srs_utime_t d) { req = r->copy(); cid = c; @@ -137,7 +137,7 @@ string SrsDvrAsyncCallOnHls::to_string() return "on_hls: " + path; } -SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(string c, SrsRequest* r, string u) +SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(SrsContextId c, SrsRequest* r, string u) { cid = c; req = r->copy(); diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index e62b5c70e..2d4481708 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -80,7 +80,7 @@ public: class SrsDvrAsyncCallOnHls : public ISrsAsyncCallTask { private: - std::string cid; + SrsContextId cid; std::string path; std::string ts_url; std::string m3u8; @@ -90,7 +90,7 @@ private: srs_utime_t duration; public: // TODO: FIXME: Use TBN 1000. - SrsDvrAsyncCallOnHls(std::string c, SrsRequest* r, std::string p, std::string t, std::string m, std::string mu, int s, srs_utime_t d); + SrsDvrAsyncCallOnHls(SrsContextId c, SrsRequest* r, std::string p, std::string t, std::string m, std::string mu, int s, srs_utime_t d); virtual ~SrsDvrAsyncCallOnHls(); public: virtual srs_error_t call(); @@ -101,11 +101,11 @@ public: class SrsDvrAsyncCallOnHlsNotify : public ISrsAsyncCallTask { private: - std::string cid; + SrsContextId cid; std::string ts_url; SrsRequest* req; public: - SrsDvrAsyncCallOnHlsNotify(std::string c, SrsRequest* r, std::string u); + SrsDvrAsyncCallOnHlsNotify(SrsContextId c, SrsRequest* r, std::string u); virtual ~SrsDvrAsyncCallOnHlsNotify(); public: virtual srs_error_t call(); diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 4d7bc7b3c..194b3af97 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -811,10 +811,10 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa // path: {pattern}{client_id} // e.g. /api/v1/clients/100 pattern= /api/v1/clients/, client_id=100 - std::string cid = r->parse_rest_id(entry->pattern); + std::string client_id = r->parse_rest_id(entry->pattern); SrsStatisticClient* client = NULL; - if (cid != "" && (client = stat->find_client(cid)) == NULL) { + if (client_id != "" && (client = stat->find_client(client_id)) == NULL) { return srs_api_response_code(w, r, ERROR_RTMP_CLIENT_NOT_FOUND); } @@ -854,7 +854,7 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa } client->conn->expire(); - srs_warn("kickoff client id=%s ok", cid.c_str()); + srs_warn("kickoff client id=%s ok", client_id.c_str()); } else { return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed); } diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index daffd531f..5fd1d8cc3 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -60,15 +60,13 @@ srs_error_t SrsHttpHooks::on_connect(string url, SrsRequest* req) { srs_error_t err = srs_success; - // TODO: FIXME: check client_id must be int? - std::string client_id = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_connect")); - // obj->set("client_id", SrsJsonAny::integer(client_id)); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -82,11 +80,11 @@ srs_error_t SrsHttpHooks::on_connect(string url, SrsRequest* req) SrsHttpClient http; if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_connect failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } srs_trace("http: on_connect ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return err; } @@ -95,13 +93,13 @@ void SrsHttpHooks::on_close(string url, SrsRequest* req, int64_t send_bytes, int { srs_error_t err = srs_success; - std::string client_id = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_close")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -117,12 +115,12 @@ void SrsHttpHooks::on_close(string url, SrsRequest* req, int64_t send_bytes, int int ret = srs_error_code(err); srs_freep(err); srs_warn("http: ignore on_close failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret); return; } srs_trace("http: on_close ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return; } @@ -131,13 +129,13 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req) { srs_error_t err = srs_success; - std::string client_id = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_publish")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -152,11 +150,11 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req) SrsHttpClient http; if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_publish failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } srs_trace("http: on_publish ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return err; } @@ -165,13 +163,13 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req) { srs_error_t err = srs_success; - std::string client_id = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_unpublish")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -187,12 +185,12 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req) int ret = srs_error_code(err); srs_freep(err); srs_warn("http: ignore on_unpublish failed, client_id=%s, url=%s, request=%s, response=%s, status=%d, ret=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret); return; } srs_trace("http: on_unpublish ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return; } @@ -201,13 +199,13 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req) { srs_error_t err = srs_success; - std::string client_id = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_play")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -222,11 +220,11 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req) SrsHttpClient http; if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http: on_play failed, client_id=%s, url=%s, request=%s, response=%s, status=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } srs_trace("http: on_play ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return err; } @@ -235,13 +233,13 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req) { srs_error_t err = srs_success; - std::string client_id = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_stop")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -257,28 +255,28 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req) int ret = srs_error_code(err); srs_freep(err); srs_warn("http: ignore on_stop failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret); return; } srs_trace("http: on_stop ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return; } -srs_error_t SrsHttpHooks::on_dvr(std::string cid, string url, SrsRequest* req, string file) +srs_error_t SrsHttpHooks::on_dvr(SrsContextId c, string url, SrsRequest* req, string file) { srs_error_t err = srs_success; - std::string client_id = cid; + SrsContextId cid = c; std::string cwd = _srs_config->cwd(); SrsJsonObject* obj = SrsJsonAny::object(); SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_dvr")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -294,20 +292,20 @@ srs_error_t SrsHttpHooks::on_dvr(std::string cid, string url, SrsRequest* req, s SrsHttpClient http; if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { return srs_error_wrap(err, "http post on_dvr uri failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); } srs_trace("http hook on_dvr success. client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return err; } -srs_error_t SrsHttpHooks::on_hls(std::string cid, string url, SrsRequest* req, string file, string ts_url, string m3u8, string m3u8_url, int sn, srs_utime_t duration) +srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, SrsRequest* req, string file, string ts_url, string m3u8, string m3u8_url, int sn, srs_utime_t duration) { srs_error_t err = srs_success; - std::string client_id = cid; + SrsContextId cid = c; std::string cwd = _srs_config->cwd(); // the ts_url is under the same dir of m3u8_url. @@ -320,7 +318,7 @@ srs_error_t SrsHttpHooks::on_hls(std::string cid, string url, SrsRequest* req, s SrsAutoFree(SrsJsonObject, obj); obj->set("action", SrsJsonAny::str("on_hls")); - obj->set("client_id", SrsJsonAny::str(client_id.c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); obj->set("ip", SrsJsonAny::str(req->ip.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str())); @@ -344,16 +342,16 @@ srs_error_t SrsHttpHooks::on_hls(std::string cid, string url, SrsRequest* req, s } srs_trace("http: on_hls ok, client_id=%s, url=%s, request=%s, response=%s", - client_id.c_str(), url.c_str(), data.c_str(), res.c_str()); + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); return err; } -srs_error_t SrsHttpHooks::on_hls_notify(std::string cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify) +srs_error_t SrsHttpHooks::on_hls_notify(SrsContextId c, std::string url, SrsRequest* req, std::string ts_url, int nb_notify) { srs_error_t err = srs_success; - std::string client_id = cid; + SrsContextId cid = c; std::string cwd = _srs_config->cwd(); if (srs_string_is_http(ts_url)) { @@ -409,7 +407,7 @@ srs_error_t SrsHttpHooks::on_hls_notify(std::string cid, std::string url, SrsReq int spenttime = (int)(srsu2ms(srs_update_system_time()) - starttime); srs_trace("http hook on_hls_notify success. client_id=%s, url=%s, code=%d, spent=%dms, read=%dB, err=%s", - client_id.c_str(), url.c_str(), msg->status_code(), spenttime, nb_read, srs_error_desc(err).c_str()); + cid.c_str(), url.c_str(), msg->status_code(), spenttime, nb_read, srs_error_desc(err).c_str()); // ignore any error for on_hls_notify. srs_error_reset(err); diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 361957c21..f9588f647 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -74,7 +74,7 @@ public: // ignore if empty. // @param file the file path, can be relative or absolute path. // @param cid the source connection cid, for the on_dvr is async call. - static srs_error_t on_dvr(std::string cid, std::string url, SrsRequest* req, std::string file); + static srs_error_t on_dvr(SrsContextId cid, std::string url, SrsRequest* req, std::string file); // When hls reap segment, callback. // @param url the api server url, to process the event. // ignore if empty. @@ -85,7 +85,7 @@ public: // @param sn the seq_no, the sequence number of ts in hls/m3u8. // @param duration the segment duration in srs_utime_t. // @param cid the source connection cid, for the on_dvr is async call. - static srs_error_t on_hls(std::string cid, std::string url, SrsRequest* req, std::string file, std::string ts_url, + static srs_error_t on_hls(SrsContextId cid, std::string url, SrsRequest* req, std::string file, std::string ts_url, std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); // When hls reap segment, callback. // @param url the api server url, to process the event. @@ -93,7 +93,7 @@ public: // @param ts_url the ts uri, used to replace the variable [ts_url] in url. // @param nb_notify the max bytes to read from notify server. // @param cid the source connection cid, for the on_dvr is async call. - static srs_error_t on_hls_notify(std::string cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); + static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); // Discover co-workers for origin cluster. static srs_error_t discover_co_workers(std::string url, std::string& host, int& port); private: diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 045812f50..82e9e484b 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -94,7 +94,7 @@ void SrsFileLog::reopen() open_log_file(); } -void SrsFileLog::verbose(const char* tag, const char* context_id, const char* fmt, ...) +void SrsFileLog::verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelVerbose) { return; @@ -114,7 +114,7 @@ void SrsFileLog::verbose(const char* tag, const char* context_id, const char* fm write_log(fd, log_data, size, SrsLogLevelVerbose); } -void SrsFileLog::info(const char* tag, const char* context_id, const char* fmt, ...) +void SrsFileLog::info(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelInfo) { return; @@ -134,7 +134,7 @@ void SrsFileLog::info(const char* tag, const char* context_id, const char* fmt, write_log(fd, log_data, size, SrsLogLevelInfo); } -void SrsFileLog::trace(const char* tag, const char* context_id, const char* fmt, ...) +void SrsFileLog::trace(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelTrace) { return; @@ -154,7 +154,7 @@ void SrsFileLog::trace(const char* tag, const char* context_id, const char* fmt, write_log(fd, log_data, size, SrsLogLevelTrace); } -void SrsFileLog::warn(const char* tag, const char* context_id, const char* fmt, ...) +void SrsFileLog::warn(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelWarn) { return; @@ -174,7 +174,7 @@ void SrsFileLog::warn(const char* tag, const char* context_id, const char* fmt, write_log(fd, log_data, size, SrsLogLevelWarn); } -void SrsFileLog::error(const char* tag, const char* context_id, const char* fmt, ...) +void SrsFileLog::error(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelError) { return; diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index d05461c06..7f4cf5418 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -55,11 +55,11 @@ public: public: virtual srs_error_t initialize(); virtual void reopen(); - virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...); - virtual void info(const char* tag, const char* context_id, const char* fmt, ...); - virtual void trace(const char* tag, const char* context_id, const char* fmt, ...); - virtual void warn(const char* tag, const char* context_id, const char* fmt, ...); - virtual void error(const char* tag, const char* context_id, const char* fmt, ...); + virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...); // Interface ISrsReloadHandler. public: virtual srs_error_t on_reload_utc_time(); diff --git a/trunk/src/app/srs_app_process.cpp b/trunk/src/app/srs_app_process.cpp index 61fb644a6..a36fc2b19 100644 --- a/trunk/src/app/srs_app_process.cpp +++ b/trunk/src/app/srs_app_process.cpp @@ -180,7 +180,7 @@ srs_error_t SrsProcess::start() srs_info("fork process: %s", cli.c_str()); // for log - std::string cid = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); int ppid = getpid(); // TODO: fork or vfork? diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 63933cde4..78ee964f0 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -57,7 +57,7 @@ ISrsMessagePumper::~ISrsMessagePumper() { } -SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, std::string parent_cid) +SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, SrsContextId parent_cid) { rtmp = r; pumper = p; @@ -71,7 +71,7 @@ SrsRecvThread::~SrsRecvThread() srs_freep(trd); } -std::string SrsRecvThread::cid() +SrsContextId SrsRecvThread::cid() { return trd->cid(); } @@ -161,7 +161,7 @@ srs_error_t SrsRecvThread::do_cycle() return err; } -SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, std::string parent_cid) +SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid) : trd(this, rtmp_sdk, tm, parent_cid) { _consumer = consumer; @@ -278,7 +278,7 @@ void SrsQueueRecvThread::on_stop() } SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, std::string parent_cid) + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, SrsContextId parent_cid) : trd(this, rtmp_sdk, tm, parent_cid) { rtmp = rtmp_sdk; @@ -290,8 +290,7 @@ SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _nb_msgs = 0; video_frames = 0; error = srs_cond_new(); - ncid = cid = ""; - + req = _req; mr_fd = mr_sock_fd; @@ -346,7 +345,7 @@ void SrsPublishRecvThread::set_cid(std::string v) ncid = v; } -std::string SrsPublishRecvThread::get_cid() +SrsContextId SrsPublishRecvThread::get_cid() { return ncid; } @@ -374,7 +373,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg) srs_error_t err = srs_success; // when cid changed, change it. - if (ncid != cid) { + if (ncid.compare(cid)) { _srs_context->set_id(ncid); cid = ncid; } diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 3ee4f53e4..1d46f58d3 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -81,16 +81,16 @@ protected: SrsCoroutine* trd; ISrsMessagePumper* pumper; SrsRtmpServer* rtmp; - std::string _parent_cid; + SrsContextId _parent_cid; // The recv timeout in srs_utime_t. srs_utime_t timeout; public: // Constructor. // @param tm The receive timeout in srs_utime_t. - SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, std::string parent_cid); + SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, SrsContextId parent_cid); virtual ~SrsRecvThread(); public: - virtual std::string cid(); + virtual SrsContextId cid(); public: virtual srs_error_t start(); virtual void stop(); @@ -117,7 +117,7 @@ private: SrsConsumer* _consumer; public: // TODO: FIXME: Refine timeout in time unit. - SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, std::string parent_cid); + SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid); virtual ~SrsQueueRecvThread(); public: virtual srs_error_t start(); @@ -168,11 +168,11 @@ private: // @see https://github.com/ossrs/srs/issues/244 srs_cond_t error; // The merged context id. - std::string cid; - std::string ncid; + SrsContextId cid; + SrsContextId ncid; public: SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, std::string parent_cid); + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, SrsContextId parent_cid); virtual ~SrsPublishRecvThread(); public: // Wait for error for some timeout. @@ -181,7 +181,7 @@ public: virtual uint64_t nb_video_frames(); virtual srs_error_t error_code(); virtual void set_cid(std::string v); - virtual std::string get_cid(); + virtual SrsContextId get_cid(); public: virtual srs_error_t start(); virtual void stop(); diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index 8a28712d3..6c8ec8692 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -183,7 +183,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe } // TODO: FIXME: When server enabled, but vhost disabled, should report error. - SrsRtcSession* session = NULL; + SrsRtcConnection* session = NULL; if ((err = server_->create_session(&request, remote_sdp, local_sdp, eip, false, &session)) != srs_success) { return srs_error_wrap(err, "create session"); } @@ -541,7 +541,7 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt } // TODO: FIXME: When server enabled, but vhost disabled, should report error. - SrsRtcSession* session = NULL; + SrsRtcConnection* session = NULL; if ((err = server_->create_session(&request, remote_sdp, local_sdp, eip, true, &session)) != srs_success) { return srs_error_wrap(err, "create session"); } @@ -808,7 +808,7 @@ srs_error_t SrsGoApiRtcNACK::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe return srs_error_new(ERROR_RTC_INVALID_PARAMS, "invalid drop=%s/%d", dropv.c_str(), drop); } - SrsRtcSession* session = server_->find_session_by_username(username); + SrsRtcConnection* session = server_->find_session_by_username(username); if (!session) { return srs_error_new(ERROR_RTC_NO_SESSION, "no session username=%s", username.c_str()); } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 07966a6a3..2495dac09 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -57,20 +57,6 @@ using namespace std; #include #include -// TODO: FIXME: Move to utility. -string gen_random_str(int len) -{ - static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; - - string ret; - ret.reserve(len); - for (int i = 0; i < len; ++i) { - ret.append(1, random_table[random() % random_table.size()]); - } - - return ret; -} - uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32; SrsNtp::SrsNtp() @@ -107,7 +93,7 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp) } -SrsSecurityTransport::SrsSecurityTransport(SrsRtcSession* s) +SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s) { session_ = s; @@ -151,7 +137,7 @@ srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size) if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { // Ignore any error for black-hole. - void* p = data; int len = size; SrsRtcSession* s = session_; + void* p = data; int len = size; SrsRtcConnection* s = session_; srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } @@ -271,7 +257,7 @@ SrsRtcOutgoingInfo::~SrsRtcOutgoingInfo() { } -SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, string parent_cid) +SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid) { _parent_cid = parent_cid; trd = new SrsDummyCoroutine(); @@ -291,7 +277,7 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, string parent_cid) _srs_config->subscribe(this); } -SrsRtcPlayer::~SrsRtcPlayer() +SrsRtcPlayStream::~SrsRtcPlayStream() { _srs_config->unsubscribe(this); @@ -300,7 +286,7 @@ SrsRtcPlayer::~SrsRtcPlayer() srs_freep(video_queue_); } -srs_error_t SrsRtcPlayer::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt) +srs_error_t SrsRtcPlayStream::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt) { srs_error_t err = srs_success; @@ -324,7 +310,7 @@ srs_error_t SrsRtcPlayer::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_ return err; } -srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost) +srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost) { SrsRequest* req = session_->req; @@ -340,17 +326,17 @@ srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost) return srs_success; } -srs_error_t SrsRtcPlayer::on_reload_vhost_realtime(string vhost) +srs_error_t SrsRtcPlayStream::on_reload_vhost_realtime(string vhost) { return on_reload_vhost_play(vhost); } -std::string SrsRtcPlayer::cid() +SrsContextId SrsRtcPlayStream::cid() { return trd->cid(); } -srs_error_t SrsRtcPlayer::start() +srs_error_t SrsRtcPlayStream::start() { srs_error_t err = srs_success; @@ -364,21 +350,21 @@ srs_error_t SrsRtcPlayer::start() return err; } -void SrsRtcPlayer::stop() +void SrsRtcPlayStream::stop() { trd->stop(); } -void SrsRtcPlayer::stop_loop() +void SrsRtcPlayStream::stop_loop() { trd->interrupt(); } -srs_error_t SrsRtcPlayer::cycle() +srs_error_t SrsRtcPlayStream::cycle() { srs_error_t err = srs_success; - SrsRtcSource* source = NULL; + SrsRtcStream* source = NULL; SrsRequest* req = session_->req; if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { @@ -470,7 +456,7 @@ srs_error_t SrsRtcPlayer::cycle() } } -srs_error_t SrsRtcPlayer::send_packets(SrsRtcSource* source, const vector& pkts, SrsRtcOutgoingInfo& info) +srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector& pkts, SrsRtcOutgoingInfo& info) { srs_error_t err = srs_success; @@ -512,7 +498,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcSource* source, const vector& pkts, SrsRtcOutgoingInfo& info) +srs_error_t SrsRtcPlayStream::do_send_packets(const std::vector& pkts, SrsRtcOutgoingInfo& info) { srs_error_t err = srs_success; @@ -596,7 +582,7 @@ srs_error_t SrsRtcPlayer::do_send_packets(const std::vector& pkt return err; } -void SrsRtcPlayer::nack_fetch(vector& pkts, uint32_t ssrc, uint16_t seq) +void SrsRtcPlayStream::nack_fetch(vector& pkts, uint32_t ssrc, uint16_t seq) { SrsRtpPacket2* pkt = NULL; @@ -611,12 +597,12 @@ void SrsRtcPlayer::nack_fetch(vector& pkts, uint32_t ssrc, uint1 } } -void SrsRtcPlayer::simulate_nack_drop(int nn) +void SrsRtcPlayStream::simulate_nack_drop(int nn) { nn_simulate_nack_drop = nn; } -void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) +void SrsRtcPlayStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) { srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), @@ -625,7 +611,7 @@ void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) nn_simulate_nack_drop--; } -srs_error_t SrsRtcPlayer::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcPlayStream::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -690,21 +676,21 @@ srs_error_t SrsRtcPlayer::on_rtcp(char* data, int nb_data) return err; } -srs_error_t SrsRtcPlayer::on_rtcp_sr(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_sr(char* buf, int nb_buf) { srs_error_t err = srs_success; // TODO: FIXME: Implements it. return err; } -srs_error_t SrsRtcPlayer::on_rtcp_xr(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_xr(char* buf, int nb_buf) { srs_error_t err = srs_success; // TODO: FIXME: Implements it. return err; } -srs_error_t SrsRtcPlayer::on_rtcp_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -790,7 +776,7 @@ srs_error_t SrsRtcPlayer::on_rtcp_feedback(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPlayer::on_rtcp_ps_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -813,7 +799,7 @@ srs_error_t SrsRtcPlayer::on_rtcp_ps_feedback(char* buf, int nb_buf) switch (fmt) { case kPLI: { - ISrsRtcPublisher* publisher = session_->source_->rtc_publisher(); + ISrsRtcPublishStream* publisher = session_->source_->publish_stream(); if (publisher) { publisher->request_keyframe(); srs_trace("RTC request PLI"); @@ -840,14 +826,14 @@ srs_error_t SrsRtcPlayer::on_rtcp_ps_feedback(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPlayer::on_rtcp_rr(char* data, int nb_data) +srs_error_t SrsRtcPlayStream::on_rtcp_rr(char* data, int nb_data) { srs_error_t err = srs_success; // TODO: FIXME: Implements it. return err; } -SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) +SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session) { report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); @@ -869,11 +855,11 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) twcc_fb_count_ = 0; } -SrsRtcPublisher::~SrsRtcPublisher() +SrsRtcPublishStream::~SrsRtcPublishStream() { // TODO: FIXME: Do unpublish when session timeout. if (source) { - source->set_rtc_publisher(NULL); + source->set_publish_stream(NULL); source->on_unpublish(); } @@ -884,7 +870,7 @@ SrsRtcPublisher::~SrsRtcPublisher() srs_freep(audio_queue_); } -srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* r) +srs_error_t SrsRtcPublishStream::initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* r) { srs_error_t err = srs_success; @@ -923,7 +909,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc return srs_error_wrap(err, "on publish"); } - source->set_rtc_publisher(this); + source->set_publish_stream(this); if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) { @@ -934,7 +920,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc return err; } -void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc) +void SrsRtcPublishStream::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc) { // If DTLS is not OK, drop all messages. if (!session_->transport_) { @@ -967,7 +953,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { // Ignore any error for black-hole. - void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_; + void* p = stream.data(); int len = stream.pos(); SrsRtcConnection* s = session_; srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } @@ -984,7 +970,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr } } -srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue) +srs_error_t SrsRtcPublishStream::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue) { srs_error_t err = srs_success; @@ -1042,7 +1028,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_q return err; } -srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc) +srs_error_t SrsRtcPublishStream::send_rtcp_xr_rrtr(uint32_t ssrc) { srs_error_t err = srs_success; @@ -1103,7 +1089,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc) return err; } -srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc) +srs_error_t SrsRtcPublishStream::send_rtcp_fb_pli(uint32_t ssrc) { srs_error_t err = srs_success; @@ -1124,7 +1110,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc) if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { // Ignore any error for black-hole. - void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_; + void* p = stream.data(); int len = stream.pos(); SrsRtcConnection* s = session_; srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } @@ -1140,12 +1126,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc) return err; } -srs_error_t SrsRtcPublisher::on_twcc(uint16_t sn) { +srs_error_t SrsRtcPublishStream::on_twcc(uint16_t sn) { srs_utime_t now = srs_get_system_time(); return rtcp_twcc_.recv_packet(sn, now); } -srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data) +srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -1201,7 +1187,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data) if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { // Ignore any error for black-hole. - void* p = unprotected_buf; int len = nb_unprotected_buf; SrsRtcSession* s = session_; + void* p = unprotected_buf; int len = nb_unprotected_buf; SrsRtcConnection* s = session_; srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } @@ -1254,7 +1240,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data) return err; } -void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) +void SrsRtcPublishStream::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) { // No payload, ignore. if (buf->empty()) { @@ -1276,7 +1262,7 @@ void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* bu } } -srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) +srs_error_t SrsRtcPublishStream::on_audio(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -1288,7 +1274,7 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) return err; } -srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) +srs_error_t SrsRtcPublishStream::on_video(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -1307,7 +1293,7 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) return err; } -srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt) +srs_error_t SrsRtcPublishStream::on_nack(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -1352,7 +1338,7 @@ srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt) return err; } -srs_error_t SrsRtcPublisher::send_periodic_twcc() +srs_error_t SrsRtcPublishStream::send_periodic_twcc() { srs_error_t err = srs_success; srs_utime_t now = srs_get_system_time(); @@ -1381,7 +1367,7 @@ srs_error_t SrsRtcPublisher::send_periodic_twcc() return err; } -srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcPublishStream::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -1446,7 +1432,7 @@ srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data) return err; } -srs_error_t SrsRtcPublisher::on_rtcp_sr(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_sr(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1537,7 +1523,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1601,14 +1587,14 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPublisher::on_rtcp_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_feedback(char* buf, int nb_buf) { srs_error_t err = srs_success; // TODO: FIXME: Implements it. return err; } -srs_error_t SrsRtcPublisher::on_rtcp_ps_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_ps_feedback(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1654,7 +1640,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_ps_feedback(char* buf, int nb_buf) return err; } -srs_error_t SrsRtcPublisher::on_rtcp_rr(char* buf, int nb_buf) +srs_error_t SrsRtcPublishStream::on_rtcp_rr(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1723,16 +1709,16 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -void SrsRtcPublisher::request_keyframe() +void SrsRtcPublishStream::request_keyframe() { - std::string scid = _srs_context->get_id(); - std::string pcid = session_->context_id(); + SrsContextId scid = _srs_context->get_id(); + SrsContextId pcid = session_->context_id(); srs_trace("RTC play=[%d][%s] request keyframe from publish=[%d][%s]", ::getpid(), scid.c_str(), ::getpid(), pcid.c_str()); request_keyframe_ = true; } -srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; @@ -1750,12 +1736,12 @@ srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t return err; } -void SrsRtcPublisher::simulate_nack_drop(int nn) +void SrsRtcPublishStream::simulate_nack_drop(int nn) { nn_simulate_nack_drop = nn; } -void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) +void SrsRtcPublishStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) { srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), @@ -1764,10 +1750,9 @@ void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) nn_simulate_nack_drop--; } -SrsRtcSession::SrsRtcSession(SrsRtcServer* s) +SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s) { req = NULL; - cid = ""; is_publisher_ = false; encrypt = true; @@ -1788,7 +1773,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s) blackhole_stfd = NULL; } -SrsRtcSession::~SrsRtcSession() +SrsRtcConnection::~SrsRtcConnection() { srs_freep(player_); srs_freep(publisher_); @@ -1799,73 +1784,73 @@ SrsRtcSession::~SrsRtcSession() srs_freep(sendonly_skt); } -SrsSdp* SrsRtcSession::get_local_sdp() +SrsSdp* SrsRtcConnection::get_local_sdp() { return &local_sdp; } -void SrsRtcSession::set_local_sdp(const SrsSdp& sdp) +void SrsRtcConnection::set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; } -SrsSdp* SrsRtcSession::get_remote_sdp() +SrsSdp* SrsRtcConnection::get_remote_sdp() { return &remote_sdp; } -void SrsRtcSession::set_remote_sdp(const SrsSdp& sdp) +void SrsRtcConnection::set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; } -SrsRtcSessionStateType SrsRtcSession::state() +SrsRtcConnectionStateType SrsRtcConnection::state() { return state_; } -void SrsRtcSession::set_state(SrsRtcSessionStateType state) +void SrsRtcConnection::set_state(SrsRtcConnectionStateType state) { state_ = state; } -string SrsRtcSession::id() +string SrsRtcConnection::id() { return peer_id_ + "/" + username_; } -string SrsRtcSession::peer_id() +string SrsRtcConnection::peer_id() { return peer_id_; } -void SrsRtcSession::set_peer_id(string v) +void SrsRtcConnection::set_peer_id(string v) { peer_id_ = v; } -string SrsRtcSession::username() +string SrsRtcConnection::username() { return username_; } -void SrsRtcSession::set_encrypt(bool v) +void SrsRtcConnection::set_encrypt(bool v) { encrypt = v; } -void SrsRtcSession::switch_to_context() +void SrsRtcConnection::switch_to_context() { _srs_context->set_id(cid); } -std::string SrsRtcSession::context_id() +SrsContextId SrsRtcConnection::context_id() { return cid; } -srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, string username, std::string context_id) +srs_error_t SrsRtcConnection::initialize(SrsRtcStream* source, SrsRequest* r, bool is_publisher, string username, SrsContextId context_id) { srs_error_t err = srs_success; @@ -1912,7 +1897,7 @@ srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool return err; } -srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) +srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) { srs_error_t err = srs_success; @@ -1942,12 +1927,12 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) return err; } -srs_error_t SrsRtcSession::on_dtls(char* data, int nb_data) +srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data) { return transport_->on_dtls(data, nb_data); } -srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -1978,7 +1963,7 @@ srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data) return err; } -srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data) +srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) { if (publisher_ == NULL) { return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null"); @@ -1991,7 +1976,7 @@ srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data) return publisher_->on_rtp(data, nb_data); } -srs_error_t SrsRtcSession::on_connection_established() +srs_error_t SrsRtcConnection::on_connection_established() { srs_error_t err = srs_success; @@ -2011,7 +1996,7 @@ srs_error_t SrsRtcSession::on_connection_established() return err; } -srs_error_t SrsRtcSession::start_play() +srs_error_t SrsRtcConnection::start_play() { srs_error_t err = srs_success; @@ -2020,7 +2005,7 @@ srs_error_t SrsRtcSession::start_play() if (player_) { return err; } - player_ = new SrsRtcPlayer(this, _srs_context->get_id()); + player_ = new SrsRtcPlayStream(this, _srs_context->get_id()); uint32_t video_ssrc = 0; uint32_t audio_ssrc = 0; @@ -2038,17 +2023,17 @@ srs_error_t SrsRtcSession::start_play() } if ((err = player_->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) { - return srs_error_wrap(err, "SrsRtcPlayer init"); + return srs_error_wrap(err, "SrsRtcPlayStream init"); } if ((err = player_->start()) != srs_success) { - return srs_error_wrap(err, "start SrsRtcPlayer"); + return srs_error_wrap(err, "start SrsRtcPlayStream"); } return err; } -srs_error_t SrsRtcSession::start_publish() +srs_error_t SrsRtcConnection::start_publish() { srs_error_t err = srs_success; @@ -2057,7 +2042,7 @@ srs_error_t SrsRtcSession::start_publish() if (publisher_) { return err; } - publisher_ = new SrsRtcPublisher(this); + publisher_ = new SrsRtcPublishStream(this); // Request PLI for exists players? //publisher_->request_keyframe(); @@ -2100,12 +2085,12 @@ srs_error_t SrsRtcSession::start_publish() return err; } -bool SrsRtcSession::is_stun_timeout() +bool SrsRtcConnection::is_stun_timeout() { return last_stun_time + sessionStunTimeout < srs_get_system_time(); } -void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt) +void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) { if (sendonly_skt) { srs_trace("session %s address changed, update %s -> %s", @@ -2116,7 +2101,7 @@ void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt) sendonly_skt = skt->copy_sendonly(); } -void SrsRtcSession::simulate_nack_drop(int nn) +void SrsRtcConnection::simulate_nack_drop(int nn) { if (player_) { player_->simulate_nack_drop(nn); @@ -2134,7 +2119,7 @@ void SrsRtcSession::simulate_nack_drop(int nn) #define be32toh ntohl #endif -srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r) +srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index b2c9208ad..e9a501bc7 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -48,9 +48,9 @@ class SrsUdpMuxSocket; class SrsConsumer; class SrsStunPacket; class SrsRtcServer; -class SrsRtcSession; +class SrsRtcConnection; class SrsSharedPtrMessage; -class SrsRtcSource; +class SrsRtcStream; class SrsRtpPacket2; class ISrsCodec; class SrsRtpNackForReceiver; @@ -75,9 +75,6 @@ const uint8_t kSLI = 2; const uint8_t kRPSI = 3; const uint8_t kAFB = 15; -// TODO: FIXME: Move to utility. -extern std::string gen_random_str(int len); - class SrsNtp { public: @@ -95,7 +92,7 @@ public: static uint64_t kMagicNtpFractionalUnit; }; -enum SrsRtcSessionStateType +enum SrsRtcConnectionStateType { // TODO: FIXME: Should prefixed by enum name. INIT = -1, @@ -109,12 +106,12 @@ enum SrsRtcSessionStateType class SrsSecurityTransport : public ISrsDtlsCallback { private: - SrsRtcSession* session_; + SrsRtcConnection* session_; SrsDtls* dtls_; SrsSRTP* srtp_; bool handshake_done; public: - SrsSecurityTransport(SrsRtcSession* s); + SrsSecurityTransport(SrsRtcConnection* s); virtual ~SrsSecurityTransport(); srs_error_t initialize(SrsSessionConfig* cfg); @@ -182,12 +179,13 @@ public: virtual ~SrsRtcOutgoingInfo(); }; -class SrsRtcPlayer : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler +// A RTC play stream, client pull and play stream from SRS. +class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { protected: - std::string _parent_cid; + SrsContextId _parent_cid; SrsCoroutine* trd; - SrsRtcSession* session_; + SrsRtcConnection* session_; private: // TODO: FIXME: How to handle timestamp overflow? // Information for audio. @@ -208,8 +206,8 @@ private: // Whether enabled nack. bool nack_enabled_; public: - SrsRtcPlayer(SrsRtcSession* s, std::string parent_cid); - virtual ~SrsRtcPlayer(); + SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid); + virtual ~SrsRtcPlayStream(); public: srs_error_t initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt); // interface ISrsReloadHandler @@ -217,7 +215,7 @@ public: virtual srs_error_t on_reload_vhost_play(std::string vhost); virtual srs_error_t on_reload_vhost_realtime(std::string vhost); public: - virtual std::string cid(); + virtual SrsContextId cid(); public: virtual srs_error_t start(); virtual void stop(); @@ -225,7 +223,7 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_packets(SrsRtcSource* source, const std::vector& pkts, SrsRtcOutgoingInfo& info); + srs_error_t send_packets(SrsRtcStream* source, const std::vector& pkts, SrsRtcOutgoingInfo& info); srs_error_t do_send_packets(const std::vector& pkts, SrsRtcOutgoingInfo& info); public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); @@ -242,13 +240,14 @@ private: srs_error_t on_rtcp_rr(char* data, int nb_data); }; -class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublisher +// A RTC publish stream, client push and publish stream to SRS. +class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublishStream { private: SrsHourGlass* report_timer; uint64_t nn_audio_frames; private: - SrsRtcSession* session_; + SrsRtcConnection* session_; uint32_t video_ssrc; uint32_t audio_ssrc; uint16_t pt_to_drop_; @@ -262,7 +261,7 @@ private: SrsRtpNackForReceiver* audio_nack_; private: SrsRequest* req; - SrsRtcSource* source; + SrsRtcStream* source; // Simulators. int nn_simulate_nack_drop; private: @@ -275,8 +274,8 @@ private: SrsRtcpTWCC rtcp_twcc_; SrsRtpExtensionTypes extension_types_; public: - SrsRtcPublisher(SrsRtcSession* session); - virtual ~SrsRtcPublisher(); + SrsRtcPublishStream(SrsRtcConnection* session); + virtual ~SrsRtcPublishStream(); public: srs_error_t initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* req); private: @@ -313,19 +312,20 @@ private: srs_error_t on_twcc(uint16_t sn); }; -class SrsRtcSession +// A RTC Peer Connection, SDP level object. +class SrsRtcConnection { friend class SrsSecurityTransport; - friend class SrsRtcPlayer; - friend class SrsRtcPublisher; + friend class SrsRtcPlayStream; + friend class SrsRtcPublishStream; public: bool disposing_; private: SrsRtcServer* server_; - SrsRtcSessionStateType state_; + SrsRtcConnectionStateType state_; SrsSecurityTransport* transport_; - SrsRtcPlayer* player_; - SrsRtcPublisher* publisher_; + SrsRtcPlayStream* player_; + SrsRtcPublishStream* publisher_; bool is_publisher_; private: SrsUdpMuxSocket* sendonly_skt; @@ -337,14 +337,14 @@ private: srs_utime_t last_stun_time; private: // For each RTC session, we use a specified cid for debugging logs. - std::string cid; + SrsContextId cid; // For each RTC session, whether requires encrypt. // Read config value, rtc_server.encrypt, default to on. // Sepcifies by HTTP API, query encrypt, optional. // TODO: FIXME: Support reload. bool encrypt; SrsRequest* req; - SrsRtcSource* source_; + SrsRtcStream* source_; SrsSdp remote_sdp; SrsSdp local_sdp; public: @@ -357,25 +357,25 @@ private: sockaddr_in* blackhole_addr; srs_netfd_t blackhole_stfd; public: - SrsRtcSession(SrsRtcServer* s); - virtual ~SrsRtcSession(); + SrsRtcConnection(SrsRtcServer* s); + virtual ~SrsRtcConnection(); public: SrsSdp* get_local_sdp(); void set_local_sdp(const SrsSdp& sdp); SrsSdp* get_remote_sdp(); void set_remote_sdp(const SrsSdp& sdp); - SrsRtcSessionStateType state(); - void set_state(SrsRtcSessionStateType state); + SrsRtcConnectionStateType state(); + void set_state(SrsRtcConnectionStateType state); std::string id(); std::string peer_id(); void set_peer_id(std::string v); std::string username(); void set_encrypt(bool v); void switch_to_context(); - std::string context_id(); + SrsContextId context_id(); public: // Before initialize, user must set the local SDP, which is used to inititlize DTLS. - srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, std::string context_id); + srs_error_t initialize(SrsRtcStream* source, SrsRequest* r, bool is_publisher, std::string username, SrsContextId context_id); // The peer address may change, we can identify that by STUN messages. srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); srs_error_t on_dtls(char* data, int nb_data); @@ -401,13 +401,13 @@ public: virtual ~ISrsRtcHijacker(); public: // When start publisher by RTC. - virtual srs_error_t on_start_publish(SrsRtcSession* session, SrsRtcPublisher* publisher, SrsRequest* req) = 0; + virtual srs_error_t on_start_publish(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req) = 0; // When got RTP plaintext packet. - virtual srs_error_t on_rtp_packet(SrsRtcSession* session, SrsRtcPublisher* publisher, SrsRequest* req, SrsRtpPacket2* pkt) = 0; + virtual srs_error_t on_rtp_packet(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req, SrsRtpPacket2* pkt) = 0; // When start player by RTC. - virtual srs_error_t on_start_play(SrsRtcSession* session, SrsRtcPlayer* player, SrsRequest* req) = 0; + virtual srs_error_t on_start_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0; // When start consuming for player for RTC. - virtual srs_error_t on_start_consume(SrsRtcSession* session, SrsRtcPlayer* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0; + virtual srs_error_t on_start_consume(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0; }; extern ISrsRtcHijacker* _srs_rtc_hijacker; diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index 838bcb9ce..80c7cc0ae 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -281,7 +281,6 @@ SSL_CTX* SrsDtls::build_dtls_ctx() #endif if (_srs_rtc_dtls_certificate->is_ecdsa()) { // By ECDSA, https://stackoverflow.com/a/6006898 - #if OPENSSL_VERSION_NUMBER >= 0x10002000L // v1.0.2 // For ECDSA, we could set the curves list. // @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set1_curves_list.html diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index a44d82e7b..9a3e5880b 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -154,9 +155,9 @@ SrsRtcServer::~SrsRtcServer() } if (true) { - std::vector::iterator it; + std::vector::iterator it; for (it = zombies_.begin(); it != zombies_.end(); ++it) { - SrsRtcSession* session = *it; + SrsRtcConnection* session = *it; srs_freep(session); } } @@ -221,7 +222,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) srs_error_t err = srs_success; char* data = skt->data(); int size = skt->size(); - SrsRtcSession* session = find_session_by_peer_id(skt->peer_id()); + SrsRtcConnection* session = find_session_by_peer_id(skt->peer_id()); if (session) { // Now, we got the RTC session to handle the packet, switch to its context @@ -296,11 +297,11 @@ srs_error_t SrsRtcServer::listen_api() srs_error_t SrsRtcServer::create_session( SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, - SrsRtcSession** psession + SrsRtcConnection** psession ) { srs_error_t err = srs_success; - SrsRtcSource* source = NULL; + SrsRtcStream* source = NULL; if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -310,12 +311,12 @@ srs_error_t SrsRtcServer::create_session( return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str()); } - std::string local_pwd = gen_random_str(32); + std::string local_pwd = srs_random_str(32); std::string local_ufrag = ""; // TODO: FIXME: Rename for a better name, it's not an username. std::string username = ""; while (true) { - local_ufrag = gen_random_str(8); + local_ufrag = srs_random_str(8); username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); if (!map_username_session.count(username)) { @@ -338,13 +339,13 @@ srs_error_t SrsRtcServer::create_session( } } - SrsRtcSession* session = new SrsRtcSession(this); + SrsRtcConnection* session = new SrsRtcConnection(this); session->set_remote_sdp(remote_sdp); // We must setup the local SDP, then initialize the session object. session->set_local_sdp(local_sdp); session->set_state(WAITING_STUN); - std::string cid = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); // Before session initialize, we must setup the local SDP. if ((err = session->initialize(source, req, publish, username, cid)) != srs_success) { srs_freep(session); @@ -357,15 +358,15 @@ srs_error_t SrsRtcServer::create_session( return err; } -srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcSession** psession) +srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcConnection** psession) { srs_error_t err = srs_success; - std::string local_pwd = gen_random_str(32); + std::string local_pwd = srs_random_str(32); // TODO: FIXME: Collision detect. - std::string local_ufrag = gen_random_str(8); + std::string local_ufrag = srs_random_str(8); - SrsRtcSession* session = new SrsRtcSession(this); + SrsRtcConnection* session = new SrsRtcConnection(this); *psession = session; local_sdp.set_ice_ufrag(local_ufrag); @@ -385,7 +386,7 @@ srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcSession** pse return err; } -srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req, const SrsSdp& remote_sdp) +srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp) { srs_error_t err = srs_success; @@ -393,7 +394,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req return err; } - SrsRtcSource* source = NULL; + SrsRtcStream* source = NULL; if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -401,7 +402,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req // TODO: FIXME: Collision detect. string username = session->get_local_sdp()->get_ice_ufrag() + ":" + remote_sdp.get_ice_ufrag(); - std::string cid = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); if ((err = session->initialize(source, req, false, username, cid)) != srs_success) { return srs_error_wrap(err, "init"); } @@ -414,14 +415,14 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req return err; } -void SrsRtcServer::destroy(SrsRtcSession* session) +void SrsRtcServer::destroy(SrsRtcConnection* session) { if (session->disposing_) { return; } session->disposing_ = true; - std::map::iterator it; + std::map::iterator it; if ((it = map_username_session.find(session->username())) != map_username_session.end()) { map_username_session.erase(it); @@ -434,16 +435,16 @@ void SrsRtcServer::destroy(SrsRtcSession* session) zombies_.push_back(session); } -bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* session) +bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session) { return map_id_session.insert(make_pair(peer_id, session)).second; } void SrsRtcServer::check_and_clean_timeout_session() { - map::iterator iter = map_username_session.begin(); + map::iterator iter = map_username_session.begin(); while (iter != map_username_session.end()) { - SrsRtcSession* session = iter->second; + SrsRtcConnection* session = iter->second; srs_assert(session); if (!session->is_stun_timeout()) { @@ -474,9 +475,9 @@ int SrsRtcServer::nn_sessions() return (int)map_username_session.size(); } -SrsRtcSession* SrsRtcServer::find_session_by_peer_id(const string& peer_id) +SrsRtcConnection* SrsRtcServer::find_session_by_peer_id(const string& peer_id) { - map::iterator iter = map_id_session.find(peer_id); + map::iterator iter = map_id_session.find(peer_id); if (iter == map_id_session.end()) { return NULL; } @@ -484,9 +485,9 @@ SrsRtcSession* SrsRtcServer::find_session_by_peer_id(const string& peer_id) return iter->second; } -SrsRtcSession* SrsRtcServer::find_session_by_username(const std::string& username) +SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username) { - map::iterator iter = map_username_session.find(username); + map::iterator iter = map_username_session.find(username); if (iter == map_username_session.end()) { return NULL; } @@ -506,12 +507,12 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return err; } - std::vector zombies; + std::vector zombies; zombies.swap(zombies_); - std::vector::iterator it; + std::vector::iterator it; for (it = zombies.begin(); it != zombies.end(); ++it) { - SrsRtcSession* session = *it; + SrsRtcConnection* session = *it; srs_freep(session); } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 164971710..1c0e97f94 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -36,7 +36,7 @@ class SrsRtcServer; class SrsHourGlass; -class SrsRtcSession; +class SrsRtcConnection; class SrsRequest; class SrsSdp; @@ -47,7 +47,7 @@ public: virtual ~ISrsRtcServerHandler(); public: // When server detect the timeout for session object. - virtual void on_timeout(SrsRtcSession* session) = 0; + virtual void on_timeout(SrsRtcConnection* session) = 0; }; class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass @@ -57,10 +57,10 @@ private: std::vector listeners; ISrsRtcServerHandler* handler; private: - std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) - std::map map_id_session; // key: peerip(ip + ":" + port) + std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) + std::map map_id_session; // key: peerip(ip + ":" + port) // The zombie sessions, we will free them. - std::vector zombies_; + std::vector zombies_; public: SrsRtcServer(); virtual ~SrsRtcServer(); @@ -78,20 +78,20 @@ public: // Peer start offering, we answer it. srs_error_t create_session( SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, - SrsRtcSession** psession + SrsRtcConnection** psession ); // We start offering, create_session2 to generate offer, setup_session2 to handle answer. - srs_error_t create_session2(SrsSdp& local_sdp, SrsRtcSession** psession); - srs_error_t setup_session2(SrsRtcSession* session, SrsRequest* req, const SrsSdp& remote_sdp); + srs_error_t create_session2(SrsSdp& local_sdp, SrsRtcConnection** psession); + srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp); // Destroy the session from server. - void destroy(SrsRtcSession* session); + void destroy(SrsRtcConnection* session); public: - bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* session); + bool insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session); void check_and_clean_timeout_session(); int nn_sessions(); - SrsRtcSession* find_session_by_username(const std::string& ufrag); + SrsRtcConnection* find_session_by_username(const std::string& ufrag); private: - SrsRtcSession* find_session_by_peer_id(const std::string& peer_id); + SrsRtcConnection* find_session_by_peer_id(const std::string& peer_id); // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 211f518a8..7550a2e71 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -90,7 +90,7 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFor return err; } -SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s) +SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s) { source = s; should_update_source_id = false; @@ -140,7 +140,7 @@ srs_error_t SrsRtcConsumer::dump_packets(std::vector& pkts) srs_error_t err = srs_success; if (should_update_source_id) { - srs_trace("update source_id=%d[%d]", source->source_id().c_str(), source->source_id().c_str()); + srs_trace("update source_id=%s[%s]", source->source_id().c_str(), source->source_id().c_str()); should_update_source_id = false; } @@ -165,21 +165,21 @@ void SrsRtcConsumer::wait(int nb_msgs) srs_cond_wait(mw_wait); } -SrsRtcSourceManager::SrsRtcSourceManager() +SrsRtcStreamManager::SrsRtcStreamManager() { lock = NULL; } -SrsRtcSourceManager::~SrsRtcSourceManager() +SrsRtcStreamManager::~SrsRtcStreamManager() { srs_mutex_destroy(lock); } -srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps) +srs_error_t SrsRtcStreamManager::fetch_or_create(SrsRequest* r, SrsRtcStream** pps) { srs_error_t err = srs_success; - // Lazy create lock, because ST is not ready in SrsRtcSourceManager constructor. + // Lazy create lock, because ST is not ready in SrsRtcStreamManager constructor. if (!lock) { lock = srs_mutex_new(); } @@ -188,7 +188,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p // @bug https://github.com/ossrs/srs/issues/1230 SrsLocker(lock); - SrsRtcSource* source = NULL; + SrsRtcStream* source = NULL; if ((source = fetch(r)) != NULL) { *pps = source; return err; @@ -202,7 +202,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p srs_trace("new source, stream_url=%s", stream_url.c_str()); - source = new SrsRtcSource(); + source = new SrsRtcStream(); if ((err = source->initialize(r)) != srs_success) { return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); } @@ -214,9 +214,9 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p return err; } -SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r) +SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r) { - SrsRtcSource* source = NULL; + SrsRtcStream* source = NULL; string stream_url = r->get_stream_url(); if (pool.find(stream_url) == pool.end()) { @@ -233,21 +233,20 @@ SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r) return source; } -SrsRtcSourceManager* _srs_rtc_sources = new SrsRtcSourceManager(); +SrsRtcStreamManager* _srs_rtc_sources = new SrsRtcStreamManager(); -ISrsRtcPublisher::ISrsRtcPublisher() +ISrsRtcPublishStream::ISrsRtcPublishStream() { } -ISrsRtcPublisher::~ISrsRtcPublisher() +ISrsRtcPublishStream::~ISrsRtcPublishStream() { } -SrsRtcSource::SrsRtcSource() +SrsRtcStream::SrsRtcStream() { - _source_id = _pre_source_id = ""; _can_publish = true; - rtc_publisher_ = NULL; + publish_stream_ = NULL; req = NULL; #ifdef SRS_FFMPEG_FIT @@ -257,7 +256,7 @@ SrsRtcSource::SrsRtcSource() #endif } -SrsRtcSource::~SrsRtcSource() +SrsRtcStream::~SrsRtcStream() { // never free the consumers, // for all consumers are auto free. @@ -267,7 +266,7 @@ SrsRtcSource::~SrsRtcSource() srs_freep(bridger_); } -srs_error_t SrsRtcSource::initialize(SrsRequest* r) +srs_error_t SrsRtcStream::initialize(SrsRequest* r) { srs_error_t err = srs_success; @@ -283,22 +282,22 @@ srs_error_t SrsRtcSource::initialize(SrsRequest* r) return err; } -void SrsRtcSource::update_auth(SrsRequest* r) +void SrsRtcStream::update_auth(SrsRequest* r) { req->update_auth(r); } -srs_error_t SrsRtcSource::on_source_id_changed(std::string id) +srs_error_t SrsRtcStream::on_source_id_changed(SrsContextId id) { srs_error_t err = srs_success; - if (_source_id == id) { + if (!_source_id.compare(id)) { return err; } - if (_pre_source_id == "") { + if (_pre_source_id.empty()) { _pre_source_id = id; - } else if (_pre_source_id != _source_id) { + } else if (_pre_source_id.compare(_source_id)) { _pre_source_id = _source_id; } @@ -314,22 +313,22 @@ srs_error_t SrsRtcSource::on_source_id_changed(std::string id) return err; } -std::string SrsRtcSource::source_id() +SrsContextId SrsRtcStream::source_id() { return _source_id; } -std::string SrsRtcSource::pre_source_id() +SrsContextId SrsRtcStream::pre_source_id() { return _pre_source_id; } -ISrsSourceBridger* SrsRtcSource::bridger() +ISrsSourceBridger* SrsRtcStream::bridger() { return bridger_; } -srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) +srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; @@ -341,7 +340,7 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) return err; } -srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg) +srs_error_t SrsRtcStream::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg) { srs_error_t err = srs_success; @@ -351,7 +350,7 @@ srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool return err; } -void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer) +void SrsRtcStream::on_consumer_destroy(SrsRtcConsumer* consumer) { std::vector::iterator it; it = std::find(consumers.begin(), consumers.end(), consumer); @@ -360,12 +359,12 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer) } } -bool SrsRtcSource::can_publish(bool is_edge) +bool SrsRtcStream::can_publish(bool is_edge) { return _can_publish; } -srs_error_t SrsRtcSource::on_publish() +srs_error_t SrsRtcStream::on_publish() { srs_error_t err = srs_success; @@ -385,7 +384,7 @@ srs_error_t SrsRtcSource::on_publish() return err; } -void SrsRtcSource::on_unpublish() +void SrsRtcStream::on_unpublish() { // ignore when already unpublished. if (_can_publish) { @@ -395,22 +394,22 @@ void SrsRtcSource::on_unpublish() srs_trace("cleanup when unpublish"); _can_publish = true; - _source_id = -1; + _source_id = SrsContextId(); // TODO: FIXME: Handle by statistic. } -ISrsRtcPublisher* SrsRtcSource::rtc_publisher() +ISrsRtcPublishStream* SrsRtcStream::publish_stream() { - return rtc_publisher_; + return publish_stream_; } -void SrsRtcSource::set_rtc_publisher(ISrsRtcPublisher* v) +void SrsRtcStream::set_publish_stream(ISrsRtcPublishStream* v) { - rtc_publisher_ = v; + publish_stream_ = v; } -srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt) +srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -425,7 +424,7 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt) } #ifdef SRS_FFMPEG_FIT -SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source) +SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) { req = NULL; source_ = source; @@ -728,7 +727,7 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* f return err; } -srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt) +srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index de4bc4daa..66215dbab 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -38,7 +38,7 @@ class SrsMetaCache; class SrsSharedPtrMessage; class SrsCommonMessage; class SrsMessageArray; -class SrsRtcSource; +class SrsRtcStream; class SrsRtcFromRtmpBridger; class SrsAudioRecode; class SrsRtpPacket2; @@ -47,7 +47,7 @@ class SrsSample; class SrsRtcConsumer { private: - SrsRtcSource* source; + SrsRtcStream* source; std::vector queue; // when source id changed, notice all consumers bool should_update_source_id; @@ -57,7 +57,7 @@ private: bool mw_waiting; int mw_min_msgs; public: - SrsRtcConsumer(SrsRtcSource* s); + SrsRtcConsumer(SrsRtcStream* s); virtual ~SrsRtcConsumer(); public: // When source id changed, notice client to print. @@ -71,49 +71,51 @@ public: virtual void wait(int nb_msgs); }; -class SrsRtcSourceManager +class SrsRtcStreamManager { private: srs_mutex_t lock; - std::map pool; + std::map pool; public: - SrsRtcSourceManager(); - virtual ~SrsRtcSourceManager(); + SrsRtcStreamManager(); + virtual ~SrsRtcStreamManager(); public: // create source when fetch from cache failed. // @param r the client request. // @param pps the matched source, if success never be NULL. - virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps); + virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcStream** pps); private: // Get the exists source, NULL when not exists. // update the request and return the exists source. - virtual SrsRtcSource* fetch(SrsRequest* r); + virtual SrsRtcStream* fetch(SrsRequest* r); }; // Global singleton instance. -extern SrsRtcSourceManager* _srs_rtc_sources; +extern SrsRtcStreamManager* _srs_rtc_sources; -class ISrsRtcPublisher +// A publish stream interface, for source to callback with. +class ISrsRtcPublishStream { public: - ISrsRtcPublisher(); - virtual ~ISrsRtcPublisher(); + ISrsRtcPublishStream(); + virtual ~ISrsRtcPublishStream(); public: virtual void request_keyframe() = 0; }; -class SrsRtcSource +// A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. +class SrsRtcStream { private: // For publish, it's the publish client id. // For edge, it's the edge ingest id. // when source id changed, for example, the edge reconnect, // invoke the on_source_id_changed() to let all clients know. - std::string _source_id; + SrsContextId _source_id; // previous source id. - std::string _pre_source_id; + SrsContextId _pre_source_id; SrsRequest* req; - ISrsRtcPublisher* rtc_publisher_; + ISrsRtcPublishStream* publish_stream_; // Transmux RTMP to RTC. ISrsSourceBridger* bridger_; private: @@ -122,17 +124,17 @@ private: // Whether source is avaiable for publishing. bool _can_publish; public: - SrsRtcSource(); - virtual ~SrsRtcSource(); + SrsRtcStream(); + virtual ~SrsRtcStream(); public: virtual srs_error_t initialize(SrsRequest* r); // Update the authentication information in request. virtual void update_auth(SrsRequest* r); // The source id changed. - virtual srs_error_t on_source_id_changed(std::string id); + virtual srs_error_t on_source_id_changed(SrsContextId id); // Get current source id. - virtual std::string source_id(); - virtual std::string pre_source_id(); + virtual SrsContextId source_id(); + virtual SrsContextId pre_source_id(); // Get the bridger. ISrsSourceBridger* bridger(); public: @@ -153,8 +155,8 @@ public: virtual void on_unpublish(); public: // Get and set the publisher, passed to consumer to process requests such as PLI. - ISrsRtcPublisher* rtc_publisher(); - void set_rtc_publisher(ISrsRtcPublisher* v); + ISrsRtcPublishStream* publish_stream(); + void set_publish_stream(ISrsRtcPublishStream* v); // Consume the shared RTP packet, user must free it. srs_error_t on_rtp(SrsRtpPacket2* pkt); }; @@ -164,7 +166,7 @@ class SrsRtcFromRtmpBridger : public ISrsSourceBridger { private: SrsRequest* req; - SrsRtcSource* source_; + SrsRtcStream* source_; // The format, codec information. SrsRtmpFormat* format; // The metadata cache. @@ -178,7 +180,7 @@ private: uint16_t audio_sequence; uint16_t video_sequence; public: - SrsRtcFromRtmpBridger(SrsRtcSource* source); + SrsRtcFromRtmpBridger(SrsRtcStream* source); virtual ~SrsRtcFromRtmpBridger(); public: virtual srs_error_t initialize(SrsRequest* r); @@ -192,7 +194,7 @@ public: virtual srs_error_t on_video(SrsSharedPtrMessage* msg); private: srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, std::vector& samples); - srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt); + srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt); srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector& samples, std::vector& pkts); srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& pkts); srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& pkts); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index b21d2cd24..3fa14461e 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -816,9 +816,6 @@ srs_error_t SrsServer::initialize_st() _srs_config->get_max_connections(), __MMAP_MAX_CONNECTIONS); } - // set current log id. - _srs_context->generate_id(); - // check asprocess. bool asprocess = _srs_config->get_asprocess(); if (asprocess && ppid == 1) { diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 2fe07696c..389156e9c 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1728,7 +1728,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost); // Get the RTC source and bridger. - SrsRtcSource* rtc = NULL; + SrsRtcStream* rtc = NULL; if (rtc_server_enabled && rtc_enabled) { if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) { err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str()); @@ -1792,7 +1792,7 @@ void SrsSourceManager::dispose() srs_error_t SrsSourceManager::cycle() { - std::string cid = _srs_context->get_id(); + SrsContextId cid = _srs_context->get_id(); srs_error_t err = do_cycle(); _srs_context->set_id(cid); @@ -1866,7 +1866,6 @@ SrsSource::SrsSource() mix_queue = new SrsMixQueue(); _can_publish = true; - _pre_source_id = _source_id = ""; die_at = 0; handler = NULL; @@ -2065,17 +2064,17 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost) return err; } -srs_error_t SrsSource::on_source_id_changed(string id) +srs_error_t SrsSource::on_source_id_changed(SrsContextId id) { srs_error_t err = srs_success; - if (_source_id == id) { + if (!_source_id.compare(id)) { return err; } - if (_pre_source_id == "") { + if (_pre_source_id.empty()) { _pre_source_id = id; - } else if (_pre_source_id != _source_id) { + } else if (_pre_source_id.compare(_source_id)) { _pre_source_id = _source_id; } @@ -2091,12 +2090,12 @@ srs_error_t SrsSource::on_source_id_changed(string id) return err; } -string SrsSource::source_id() +SrsContextId SrsSource::source_id() { return _source_id; } -string SrsSource::pre_source_id() +SrsContextId SrsSource::pre_source_id() { return _pre_source_id; } @@ -2560,7 +2559,7 @@ void SrsSource::on_unpublish() srs_trace("cleanup when unpublish"); _can_publish = true; - _source_id = -1; + _source_id = SrsContextId(); // notify the handler. srs_assert(handler); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4cdb4be09..ededa3d7c 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -508,9 +508,9 @@ private: // For edge, it's the edge ingest id. // when source id changed, for example, the edge reconnect, // invoke the on_source_id_changed() to let all clients know. - std::string _source_id; + SrsContextId _source_id; // previous source id. - std::string _pre_source_id; + SrsContextId _pre_source_id; // deep copy of client request. SrsRequest* req; // To delivery stream to clients. @@ -567,10 +567,10 @@ public: virtual srs_error_t on_reload_vhost_play(std::string vhost); public: // The source id changed. - virtual srs_error_t on_source_id_changed(std::string id); + virtual srs_error_t on_source_id_changed(SrsContextId id); // Get current source id. - virtual std::string source_id(); - virtual std::string pre_source_id(); + virtual SrsContextId source_id(); + virtual SrsContextId pre_source_id(); // Whether source is inactive, which means there is no publishing stream source. // @remark For edge, it's inactive util stream has been pulled from origin. virtual bool inactive(); diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 36b2942e0..5fc7fe528 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -74,18 +74,27 @@ srs_error_t SrsDummyCoroutine::pull() return srs_error_new(ERROR_THREAD_DUMMY, "dummy pull"); } -string SrsDummyCoroutine::cid() +SrsContextId SrsDummyCoroutine::cid() { - return ""; + return SrsContextId(); } _ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create; -SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, std::string cid) +SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h) { name = n; handler = h; - context = cid; + trd = NULL; + trd_err = srs_success; + started = interrupted = disposed = cycle_done = false; +} + +SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid) +{ + name = n; + handler = h; + cid_ = cid; trd = NULL; trd_err = srs_success; started = interrupted = disposed = cycle_done = false; @@ -180,19 +189,18 @@ srs_error_t SrsSTCoroutine::pull() return srs_error_copy(trd_err); } -string SrsSTCoroutine::cid() +SrsContextId SrsSTCoroutine::cid() { - return context; + return cid_; } srs_error_t SrsSTCoroutine::cycle() { if (_srs_context) { - if (!context.empty()) { - _srs_context->set_id(context); - } else { - context = _srs_context->generate_id(); + if (cid_.empty()) { + cid_ = _srs_context->generate_id(); } + _srs_context->set_id(cid_); } srs_error_t err = handler->cycle(); diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index cdf4ca170..2db3ccf56 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -28,6 +28,7 @@ #include +#include #include #include @@ -80,7 +81,7 @@ public: // @return a copy of error, which should be freed by user. // NULL if not terminated and user should pull again. virtual srs_error_t pull() = 0; - virtual std::string cid() = 0; + virtual SrsContextId cid() = 0; }; // An empty coroutine, user can default to this object before create any real coroutine. @@ -95,7 +96,7 @@ public: virtual void stop(); virtual void interrupt(); virtual srs_error_t pull(); - virtual std::string cid(); + virtual SrsContextId cid(); }; // For utest to mock the thread create. @@ -121,7 +122,7 @@ private: ISrsCoroutineHandler* handler; private: srs_thread_t trd; - std::string context; + SrsContextId cid_; srs_error_t trd_err; private: bool started; @@ -132,7 +133,8 @@ private: public: // Create a thread with name n and handler h. // @remark User can specify a cid for thread to use, or we will allocate a new one. - SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h, std::string cid = ""); + SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h); + SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid); virtual ~SrsSTCoroutine(); public: // Start the thread. @@ -154,7 +156,7 @@ public: // @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted. virtual srs_error_t pull(); // Get the context id of thread. - virtual std::string cid(); + virtual SrsContextId cid(); private: virtual srs_error_t cycle(); static void* pfn(void* arg); diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 7c505b220..6da79f2a2 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -101,8 +101,7 @@ SrsStatisticStream::SrsStatisticStream() id = srs_generate_id(); vhost = NULL; active = false; - connection_cid = -1; - + has_video = false; vcodec = SrsVideoCodecIdReserved; avc_profile = SrsAvcProfileReserved; @@ -184,7 +183,7 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj) return err; } -void SrsStatisticStream::publish(string cid) +void SrsStatisticStream::publish(SrsContextId cid) { connection_cid = cid; active = true; @@ -337,10 +336,10 @@ SrsStatisticStream* SrsStatistic::find_stream(string sid) return NULL; } -SrsStatisticClient* SrsStatistic::find_client(string cid) +SrsStatisticClient* SrsStatistic::find_client(string client_id) { std::map::iterator it; - if ((it = clients.find(cid)) != clients.end()) { + if ((it = clients.find(client_id)) != clients.end()) { return it->second; } return NULL; @@ -392,7 +391,7 @@ srs_error_t SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames) return err; } -void SrsStatistic::on_stream_publish(SrsRequest* req, string cid) +void SrsStatistic::on_stream_publish(SrsRequest* req, SrsContextId cid) { SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); @@ -423,10 +422,13 @@ void SrsStatistic::on_stream_close(SrsRequest* req) } } -srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type) +srs_error_t SrsStatistic::on_client(SrsContextId cid, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type) { srs_error_t err = srs_success; - + + // TODO: FIXME: We should use UUID for client ID. + std::string id = cid.c_str(); + SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); @@ -451,8 +453,11 @@ srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, SrsConnecti return err; } -void SrsStatistic::on_disconnect(std::string id) +void SrsStatistic::on_disconnect(SrsContextId cid) { + // TODO: FIXME: We should use UUID for client ID. + std::string id = cid.c_str(); + std::map::iterator it; if ((it = clients.find(id)) == clients.end()) { return; @@ -471,7 +476,8 @@ void SrsStatistic::on_disconnect(std::string id) void SrsStatistic::kbps_add_delta(SrsConnection* conn) { - std::string id = conn->srs_id(); + // TODO: FIXME: Should not use context id as connection id. + std::string id = conn->srs_id().c_str(); if (clients.find(id) == clients.end()) { return; } diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 7f0de259e..2a4aaf3aa 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -67,7 +67,7 @@ public: std::string stream; std::string url; bool active; - std::string connection_cid; + SrsContextId connection_cid; int nb_clients; uint64_t nb_frames; public: @@ -101,7 +101,7 @@ public: virtual srs_error_t dumps(SrsJsonObject* obj); public: // Publish the stream. - virtual void publish(std::string cid); + virtual void publish(SrsContextId cid); // Close the stream. virtual void close(); }; @@ -181,7 +181,7 @@ public: public: virtual SrsStatisticVhost* find_vhost(std::string vid); virtual SrsStatisticStream* find_stream(std::string sid); - virtual SrsStatisticClient* find_client(std::string cid); + virtual SrsStatisticClient* find_client(std::string client_id); public: // When got video info for stream. virtual srs_error_t on_video_info(SrsRequest* req, SrsVideoCodecId vcodec, SrsAvcProfile avc_profile, @@ -195,7 +195,7 @@ public: // When publish stream. // @param req the request object of publish connection. // @param cid the cid of publish connection. - virtual void on_stream_publish(SrsRequest* req, std::string cid); + virtual void on_stream_publish(SrsRequest* req, SrsContextId cid); // When close stream. virtual void on_stream_close(SrsRequest* req); public: @@ -204,12 +204,14 @@ public: // @param req, the client request object. // @param conn, the physical absract connection object. // @param type, the type of connection. - virtual srs_error_t on_client(std::string id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type); + // TODO: FIXME: We should not use context id as client id. + virtual srs_error_t on_client(SrsContextId id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type); // Client disconnect // @remark the on_disconnect always call, while the on_client is call when // only got the request object, so the client specified by id maybe not // exists in stat. - virtual void on_disconnect(std::string id); + // TODO: FIXME: We should not use context id as client id. + virtual void on_disconnect(SrsContextId id); // Sample the kbps, add delta bytes of conn. // Use kbps_sample() to get all result of kbps stat. // TODO: FIXME: the add delta must use ISrsKbpsDelta interface instead. diff --git a/trunk/src/core/srs_core.cpp b/trunk/src/core/srs_core.cpp index 57259c0de..539f4e267 100644 --- a/trunk/src/core/srs_core.cpp +++ b/trunk/src/core/srs_core.cpp @@ -23,4 +23,42 @@ #include +_SrsContextId::_SrsContextId() +{ +} + +_SrsContextId::_SrsContextId(std::string v) +{ + v_ = v; +} + +_SrsContextId::_SrsContextId(const _SrsContextId& cp) +{ + v_ = cp.v_; +} + +_SrsContextId& _SrsContextId::operator=(const _SrsContextId& cp) +{ + v_ = cp.v_; + return *this; +} + +_SrsContextId::~_SrsContextId() +{ +} + +const char* _SrsContextId::c_str() const +{ + return v_.c_str(); +} + +bool _SrsContextId::empty() const +{ + return v_.empty(); +} + +int _SrsContextId::compare(const _SrsContextId& to) const +{ + return v_.compare(to.v_); +} diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 051eb7ae0..bb983c2af 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -113,4 +113,33 @@ class SrsCplxError; typedef SrsCplxError* srs_error_t; +#include +// The context ID, it default to a string object, we can also use other objects. +// @remark User can directly user string as SrsContextId, we user struct to ensure the context is an object. +#if 1 +class _SrsContextId +{ +private: + std::string v_; +public: + _SrsContextId(); + _SrsContextId(std::string v); + _SrsContextId(const _SrsContextId& cp); + _SrsContextId& operator=(const _SrsContextId& cp); + virtual ~_SrsContextId(); +public: + const char* c_str() const; + bool empty() const; + // Compare the two context id. @see http://www.cplusplus.com/reference/string/string/compare/ + // 0 They compare equal + // <0 Either the value of the first character that does not match is lower in the compared string, or all compared characters match but the compared string is shorter. + // >0 Either the value of the first character that does not match is greater in the compared string, or all compared characters match but the compared string is longer. + int compare(const _SrsContextId& to) const; +}; +typedef _SrsContextId SrsContextId; +#else +// Actually, we can directly user string as SrsContextId. +typedef std::string SrsContextId; +#endif + #endif diff --git a/trunk/src/kernel/srs_kernel_error.cpp b/trunk/src/kernel/srs_kernel_error.cpp index dd5efc35a..f6d7340f8 100644 --- a/trunk/src/kernel/srs_kernel_error.cpp +++ b/trunk/src/kernel/srs_kernel_error.cpp @@ -57,7 +57,7 @@ SrsCplxError::SrsCplxError() { code = ERROR_SUCCESS; wrapped = NULL; - cid = rerrno = line = 0; + rerrno = line = 0; } SrsCplxError::~SrsCplxError() @@ -79,7 +79,7 @@ std::string SrsCplxError::description() { next = this; while (next) { - ss << "thread [" << getpid() << "][" << next->cid << "]: " + ss << "thread [" << getpid() << "][" << next->cid.c_str() << "]: " << next->func << "() [" << next->file << ":" << next->line << "]" << "[errno=" << next->rerrno << "]"; diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 509c27ca2..8231fbd05 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -414,7 +414,7 @@ private: std::string file; int line; - std::string cid; + SrsContextId cid; int rerrno; std::string desc; diff --git a/trunk/src/kernel/srs_kernel_log.hpp b/trunk/src/kernel/srs_kernel_log.hpp index 9d47c3371..dbf149cf2 100644 --- a/trunk/src/kernel/srs_kernel_log.hpp +++ b/trunk/src/kernel/srs_kernel_log.hpp @@ -65,37 +65,38 @@ public: virtual void reopen() = 0; public: // The log for verbose, very verbose information. - virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...) = 0; + virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0; // The log for debug, detail information. - virtual void info(const char* tag, const char* context_id, const char* fmt, ...) = 0; + virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0; // The log for trace, important information. - virtual void trace(const char* tag, const char* context_id, const char* fmt, ...) = 0; + virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0; // The log for warn, warn is something should take attention, but not a error. - virtual void warn(const char* tag, const char* context_id, const char* fmt, ...) = 0; + virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0; // The log for error, something error occur, do something about the error, ie. close the connection, // but we will donot abort the program. - virtual void error(const char* tag, const char* context_id, const char* fmt, ...) = 0; + virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0; }; -// The logic context for a RTMP connection, or RTC Session. +// The logic context, for example, a RTMP connection, or RTC Session, etc. // We can grep the context id to identify the logic unit, for debugging. // For example: -// _srs_context->generate_id(); // Generate a new context. -// _srs_context->get_id(); // Get current context. -// int old_id = _srs_context->set_id("1000"); // Change the context. +// SrsContextId cid = _srs_context->get_id(); // Get current context id. +// SrsContextId new_cid = _srs_context->generate_id(); // Generate a new context id. +// SrsContextId old_cid = _srs_context->set_id(new_cid); // Change the context id. class ISrsContext { public: ISrsContext(); virtual ~ISrsContext(); public: - // Generate the id for current context. - virtual std::string generate_id() = 0; - // Get the generated id of current context. - virtual std::string get_id() = 0; - // Set the id of current context. - // @return the previous id value; 0 if no context. - virtual std::string set_id(std::string v) = 0; + // Generate a new context id. + // @remark We do not set to current thread, user should do this. + virtual SrsContextId generate_id() = 0; + // Get the context id of current thread. + virtual SrsContextId get_id() = 0; + // Set the context id of current thread. + // @return the previous context id. + virtual SrsContextId set_id(SrsContextId v) = 0; }; // @global User must provides a log object @@ -108,25 +109,25 @@ extern ISrsContext* _srs_context; // Use __FUNCTION__ to print c method // Use __PRETTY_FUNCTION__ to print c++ class:method #if 1 - #define srs_verbose(msg, ...) _srs_log->verbose(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_info(msg, ...) _srs_log->info(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_trace(msg, ...) _srs_log->trace(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_warn(msg, ...) _srs_log->warn(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_error(msg, ...) _srs_log->error(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) + #define srs_verbose(msg, ...) _srs_log->verbose(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_info(msg, ...) _srs_log->info(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_trace(msg, ...) _srs_log->trace(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_warn(msg, ...) _srs_log->warn(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_error(msg, ...) _srs_log->error(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__) #endif #if 0 - #define srs_verbose(msg, ...) _srs_log->verbose(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_info(msg, ...) _srs_log->info(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_trace(msg, ...) _srs_log->trace(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_warn(msg, ...) _srs_log->warn(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_error(msg, ...) _srs_log->error(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) + #define srs_verbose(msg, ...) _srs_log->verbose(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_info(msg, ...) _srs_log->info(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_trace(msg, ...) _srs_log->trace(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_warn(msg, ...) _srs_log->warn(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_error(msg, ...) _srs_log->error(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) #endif #if 0 - #define srs_verbose(msg, ...) _srs_log->verbose(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_info(msg, ...) _srs_log->info(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_trace(msg, ...) _srs_log->trace(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_warn(msg, ...) _srs_log->warn(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) - #define srs_error(msg, ...) _srs_log->error(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__) + #define srs_verbose(msg, ...) _srs_log->verbose(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_info(msg, ...) _srs_log->info(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_trace(msg, ...) _srs_log->trace(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_warn(msg, ...) _srs_log->warn(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) + #define srs_error(msg, ...) _srs_log->error(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__) #endif // TODO: FIXME: Add more verbose and info logs. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index e9d6e51be..6ec4d01f7 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -213,7 +213,7 @@ srs_error_t do_main(int argc, char** argv) int main(int argc, char** argv) { // For background context id. - _srs_context->generate_id(); + _srs_context->set_id(_srs_context->generate_id()); srs_error_t err = do_main(argc, argv); diff --git a/trunk/src/protocol/srs_protocol_utility.cpp b/trunk/src/protocol/srs_protocol_utility.cpp index 57e65ecf0..69b5bc3bd 100644 --- a/trunk/src/protocol/srs_protocol_utility.cpp +++ b/trunk/src/protocol/srs_protocol_utility.cpp @@ -148,20 +148,39 @@ void srs_parse_query_string(string q, map& query) } } +static bool _random_initialized = false; + void srs_random_generate(char* bytes, int size) { - static bool _random_initialized = false; if (!_random_initialized) { - srand(0); _random_initialized = true; + ::srandom((unsigned long)(srs_update_system_time() | (::getpid()<<13))); } for (int i = 0; i < size; i++) { // the common value in [0x0f, 0xf0] - bytes[i] = 0x0f + (rand() % (256 - 0x0f - 0x0f)); + bytes[i] = 0x0f + (random() % (256 - 0x0f - 0x0f)); } } +std::string srs_random_str(int len) +{ + if (!_random_initialized) { + _random_initialized = true; + ::srandom((unsigned long)(srs_update_system_time() | (::getpid()<<13))); + } + + static string random_table = "01234567890123456789012345678901234567890123456789abcdefghijklmnopqrstuvwxyz"; + + string ret; + ret.reserve(len); + for (int i = 0; i < len; ++i) { + ret.append(1, random_table[random() % random_table.size()]); + } + + return ret; +} + string srs_generate_tc_url(string host, string vhost, string app, int port) { string tcUrl = "rtmp://"; diff --git a/trunk/src/protocol/srs_protocol_utility.hpp b/trunk/src/protocol/srs_protocol_utility.hpp index 3f4cc03d2..aaf907ec3 100644 --- a/trunk/src/protocol/srs_protocol_utility.hpp +++ b/trunk/src/protocol/srs_protocol_utility.hpp @@ -65,11 +65,12 @@ extern void srs_discovery_tc_url(std::string tcUrl, std::string& schema, std::st // must format as key=value&...&keyN=valueN extern void srs_parse_query_string(std::string q, std::map& query); -/** - * generate ramdom data for handshake. - */ +// Generate ramdom data for handshake. extern void srs_random_generate(char* bytes, int size); +// Generate random string [0-9a-z] in size of len bytes. +extern std::string srs_random_str(int len); + /** * generate the tcUrl without param. * @remark Use host as tcUrl.vhost if vhost is default vhost. diff --git a/trunk/src/protocol/srs_service_log.cpp b/trunk/src/protocol/srs_service_log.cpp index 3036dfa25..3d711343b 100644 --- a/trunk/src/protocol/srs_service_log.cpp +++ b/trunk/src/protocol/srs_service_log.cpp @@ -31,6 +31,7 @@ using namespace std; #include #include +#include #define SRS_BASIC_LOG_SIZE 1024 @@ -42,31 +43,22 @@ SrsThreadContext::~SrsThreadContext() { } -string SrsThreadContext::generate_id() +SrsContextId SrsThreadContext::generate_id() { - static int id = 0; - - if (id == 0) { - id = (100 + ((uint32_t)(int64_t)this)%1000); - } - int gid = id++; - - stringstream ss; - ss << gid; - cache[srs_thread_self()] = ss.str(); - return ss.str(); + SrsContextId cid = SrsContextId(srs_random_str(8)); + return cid; } -string SrsThreadContext::get_id() +SrsContextId SrsThreadContext::get_id() { return cache[srs_thread_self()]; } -string SrsThreadContext::set_id(string v) +SrsContextId SrsThreadContext::set_id(SrsContextId v) { srs_thread_t self = srs_thread_self(); - string ov; + SrsContextId ov; if (cache.find(self) != cache.end()) { ov = cache[self]; } @@ -79,7 +71,7 @@ string SrsThreadContext::set_id(string v) void SrsThreadContext::clear_cid() { srs_thread_t self = srs_thread_self(); - std::map::iterator it = cache.find(self); + std::map::iterator it = cache.find(self); if (it != cache.end()) { cache.erase(it); } @@ -108,7 +100,7 @@ void SrsConsoleLog::reopen() { } -void SrsConsoleLog::verbose(const char* tag, const char* context_id, const char* fmt, ...) +void SrsConsoleLog::verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelVerbose) { return; @@ -128,7 +120,7 @@ void SrsConsoleLog::verbose(const char* tag, const char* context_id, const char* fprintf(stdout, "%s\n", buffer); } -void SrsConsoleLog::info(const char* tag, const char* context_id, const char* fmt, ...) +void SrsConsoleLog::info(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelInfo) { return; @@ -148,7 +140,7 @@ void SrsConsoleLog::info(const char* tag, const char* context_id, const char* fm fprintf(stdout, "%s\n", buffer); } -void SrsConsoleLog::trace(const char* tag, const char* context_id, const char* fmt, ...) +void SrsConsoleLog::trace(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelTrace) { return; @@ -168,7 +160,7 @@ void SrsConsoleLog::trace(const char* tag, const char* context_id, const char* f fprintf(stdout, "%s\n", buffer); } -void SrsConsoleLog::warn(const char* tag, const char* context_id, const char* fmt, ...) +void SrsConsoleLog::warn(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelWarn) { return; @@ -188,7 +180,7 @@ void SrsConsoleLog::warn(const char* tag, const char* context_id, const char* fm fprintf(stderr, "%s\n", buffer); } -void SrsConsoleLog::error(const char* tag, const char* context_id, const char* fmt, ...) +void SrsConsoleLog::error(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelError) { return; @@ -214,7 +206,7 @@ void SrsConsoleLog::error(const char* tag, const char* context_id, const char* f } // LCOV_EXCL_STOP -bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, const char* cid, const char* level, int* psize) +bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, SrsContextId cid, const char* level, int* psize) { // clock time timeval tv; @@ -240,24 +232,24 @@ bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char written = snprintf(buffer, size, "[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%s][%d][%s][%d] ", 1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000), - level, tag, getpid(), cid, errno); + level, tag, getpid(), cid.c_str(), errno); } else { written = snprintf(buffer, size, "[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%d][%s][%d] ", 1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000), - level, getpid(), cid, errno); + level, getpid(), cid.c_str(), errno); } } else { if (tag) { written = snprintf(buffer, size, "[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%s][%d][%s] ", 1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000), - level, tag, getpid(), cid); + level, tag, getpid(), cid.c_str()); } else { written = snprintf(buffer, size, "[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%d][%s] ", 1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000), - level, getpid(), cid); + level, getpid(), cid.c_str()); } } diff --git a/trunk/src/protocol/srs_service_log.hpp b/trunk/src/protocol/srs_service_log.hpp index a0bd141ab..656723e64 100644 --- a/trunk/src/protocol/srs_service_log.hpp +++ b/trunk/src/protocol/srs_service_log.hpp @@ -37,14 +37,14 @@ class SrsThreadContext : public ISrsContext { private: - std::map cache; + std::map cache; public: SrsThreadContext(); virtual ~SrsThreadContext(); public: - virtual std::string generate_id(); - virtual std::string get_id(); - virtual std::string set_id(std::string v); + virtual SrsContextId generate_id(); + virtual SrsContextId get_id(); + virtual SrsContextId set_id(SrsContextId v); public: virtual void clear_cid(); }; @@ -64,11 +64,11 @@ public: public: virtual srs_error_t initialize(); virtual void reopen(); - virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...); - virtual void info(const char* tag, const char* context_id, const char* fmt, ...); - virtual void trace(const char* tag, const char* context_id, const char* fmt, ...); - virtual void warn(const char* tag, const char* context_id, const char* fmt, ...); - virtual void error(const char* tag, const char* context_id, const char* fmt, ...); + virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...); + virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...); }; // Generate the log header. @@ -76,6 +76,6 @@ public: // @param utc Whether use UTC time format in the log header. // @param psize Output the actual header size. // @remark It's a internal API. -bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, const char* cid, const char* level, int* psize); +bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, SrsContextId cid, const char* level, int* psize); #endif diff --git a/trunk/src/protocol/srs_service_st.cpp b/trunk/src/protocol/srs_service_st.cpp index ff40b8a03..f2fbf20e5 100644 --- a/trunk/src/protocol/srs_service_st.cpp +++ b/trunk/src/protocol/srs_service_st.cpp @@ -70,8 +70,8 @@ srs_error_t srs_st_init() return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name()); } - // Before ST init, we might have already inited the background cid. - string cid = _srs_context->get_id(); + // Before ST init, we might have already initialized the background cid. + SrsContextId cid = _srs_context->get_id(); if (cid.empty()) { cid = _srs_context->generate_id(); } diff --git a/trunk/src/utest/srs_utest.cpp b/trunk/src/utest/srs_utest.cpp index 9d8678518..20a691a15 100644 --- a/trunk/src/utest/srs_utest.cpp +++ b/trunk/src/utest/srs_utest.cpp @@ -137,3 +137,47 @@ VOID TEST(SampleTest, StringEQTest) EXPECT_STREQ("100", str.c_str()); } +class MockSrsContextId +{ +public: + MockSrsContextId() { + bind_ = NULL; + } + MockSrsContextId(const MockSrsContextId& cp){ + bind_ = NULL; + if (cp.bind_) { + bind_ = cp.bind_->copy(); + } + } + MockSrsContextId& operator= (const MockSrsContextId& cp) { + srs_freep(bind_); + if (cp.bind_) { + bind_ = cp.bind_->copy(); + } + return *this; + } + virtual ~MockSrsContextId() { + srs_freep(bind_); + } +public: + MockSrsContextId* copy() const { + MockSrsContextId* cp = new MockSrsContextId(); + if (bind_) { + cp->bind_ = bind_->copy(); + } + return cp; + } +private: + MockSrsContextId* bind_; +}; + +VOID TEST(SampleTest, ContextTest) +{ + MockSrsContextId cid; + cid.bind_ = new MockSrsContextId(); + + static std::map cache; + cache[0] = cid; + cache[0] = cid; +} + diff --git a/trunk/src/utest/srs_utest_app.cpp b/trunk/src/utest/srs_utest_app.cpp index 25f63af95..e04a859de 100644 --- a/trunk/src/utest/srs_utest_app.cpp +++ b/trunk/src/utest/srs_utest_app.cpp @@ -36,7 +36,7 @@ VOID TEST(AppCoroutineTest, Dummy) SrsDummyCoroutine dc; if (true) { - EXPECT_EQ("", dc.cid()); + EXPECT_TRUE(dc.cid().empty()); srs_error_t err = dc.pull(); EXPECT_TRUE(err != srs_success); @@ -52,7 +52,7 @@ VOID TEST(AppCoroutineTest, Dummy) if (true) { dc.stop(); - EXPECT_EQ("", dc.cid()); + EXPECT_TRUE(dc.cid().empty()); srs_error_t err = dc.pull(); EXPECT_TRUE(err != srs_success); @@ -68,7 +68,7 @@ VOID TEST(AppCoroutineTest, Dummy) if (true) { dc.interrupt(); - EXPECT_EQ("", dc.cid()); + EXPECT_TRUE(dc.cid().empty()); srs_error_t err = dc.pull(); EXPECT_TRUE(err != srs_success); @@ -88,7 +88,7 @@ public: srs_error_t err; srs_cond_t running; srs_cond_t exited; - std::string cid; + SrsContextId cid; // Quit without error. bool quit; public: @@ -128,12 +128,12 @@ VOID TEST(AppCoroutineTest, StartStop) MockCoroutineHandler ch; SrsSTCoroutine sc("test", &ch); ch.trd = ≻ - EXPECT_EQ("", sc.cid()); + EXPECT_TRUE(sc.cid().empty()); // Thread stop after created. sc.stop(); - EXPECT_EQ("", sc.cid()); + EXPECT_TRUE(sc.cid().empty()); srs_error_t err = sc.pull(); EXPECT_TRUE(srs_success != err); @@ -151,7 +151,7 @@ VOID TEST(AppCoroutineTest, StartStop) MockCoroutineHandler ch; SrsSTCoroutine sc("test", &ch); ch.trd = ≻ - EXPECT_EQ("", sc.cid()); + EXPECT_TRUE(sc.cid().empty()); EXPECT_TRUE(srs_success == sc.start()); EXPECT_TRUE(srs_success == sc.pull()); @@ -178,7 +178,7 @@ VOID TEST(AppCoroutineTest, StartStop) MockCoroutineHandler ch; SrsSTCoroutine sc("test", &ch); ch.trd = ≻ - EXPECT_EQ("", sc.cid()); + EXPECT_TRUE(sc.cid().empty()); EXPECT_TRUE(srs_success == sc.start()); EXPECT_TRUE(srs_success == sc.pull()); @@ -220,16 +220,16 @@ VOID TEST(AppCoroutineTest, Cycle) if (true) { MockCoroutineHandler ch; - SrsSTCoroutine sc("test", &ch, "250"); + SrsSTCoroutine sc("test", &ch, SrsContextId("250")); ch.trd = ≻ - EXPECT_TRUE("250" == sc.cid()); + EXPECT_TRUE(!sc.cid().compare(SrsContextId("250"))); EXPECT_TRUE(srs_success == sc.start()); EXPECT_TRUE(srs_success == sc.pull()); // After running, the cid in cycle should equal to the thread. srs_cond_timedwait(ch.running, 100 * SRS_UTIME_MILLISECONDS); - EXPECT_TRUE("250" == ch.cid); + EXPECT_TRUE(!ch.cid.compare(SrsContextId("250"))); } if (true) { diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index 7ae98d8b2..ab5250422 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -898,7 +898,7 @@ class MockOnCycleThread : public ISrsCoroutineHandler public: SrsSTCoroutine trd; srs_cond_t cond; - MockOnCycleThread() : trd("mock", this, "0") { + MockOnCycleThread() : trd("mock", this) { cond = srs_cond_new(); }; virtual ~MockOnCycleThread() { @@ -938,7 +938,7 @@ class MockOnCycleThread2 : public ISrsCoroutineHandler public: SrsSTCoroutine trd; srs_mutex_t lock; - MockOnCycleThread2() : trd("mock", this, "0") { + MockOnCycleThread2() : trd("mock", this) { lock = srs_mutex_new(); }; virtual ~MockOnCycleThread2() { @@ -979,7 +979,7 @@ class MockOnCycleThread3 : public ISrsCoroutineHandler public: SrsSTCoroutine trd; srs_netfd_t fd; - MockOnCycleThread3() : trd("mock", this, "0") { + MockOnCycleThread3() : trd("mock", this) { fd = NULL; }; virtual ~MockOnCycleThread3() { @@ -1230,7 +1230,7 @@ class MockOnCycleThread4 : public ISrsCoroutineHandler public: SrsSTCoroutine trd; srs_netfd_t fd; - MockOnCycleThread4() : trd("mock", this, "0") { + MockOnCycleThread4() : trd("mock", this) { fd = NULL; }; virtual ~MockOnCycleThread4() { @@ -1382,19 +1382,19 @@ VOID TEST(TCPServerTest, ContextUtility) if (true) { SrsThreadContext ctx; - EXPECT_TRUE("" == ctx.set_id("100")); - EXPECT_TRUE("100" == ctx.set_id("1000")); - EXPECT_TRUE("1000" == ctx.get_id()); + EXPECT_TRUE(ctx.set_id(SrsContextId("100")).empty()); + EXPECT_TRUE(!ctx.set_id(SrsContextId("1000")).compare(SrsContextId("100"))); + EXPECT_TRUE(!ctx.get_id().compare(SrsContextId("1000"))); ctx.clear_cid(); - EXPECT_TRUE("" == ctx.set_id("100")); + EXPECT_TRUE(ctx.set_id(SrsContextId("100")).empty()); } int base_size = 0; if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", "100", "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", SrsContextId("100"), "Trace", &size)); base_size = size; EXPECT_TRUE(base_size > 0); } @@ -1402,21 +1402,21 @@ VOID TEST(TCPServerTest, ContextUtility) if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", "100", "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", SrsContextId("100"), "Trace", &size)); EXPECT_EQ(base_size, size); } if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, false, true, NULL, "100", "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, false, true, NULL, SrsContextId("100"), "Trace", &size)); EXPECT_EQ(base_size - 5, size); } if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, "100", "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, SrsContextId("100"), "Trace", &size)); EXPECT_EQ(base_size - 8, size); }