1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch 'feature/rtc' into develop

This commit is contained in:
winlin 2020-07-11 22:23:18 +08:00
commit 778e546a11
45 changed files with 609 additions and 492 deletions

View file

@ -201,7 +201,7 @@ srs_error_t SrsConnection::cycle()
return srs_success;
}
string SrsConnection::srs_id()
SrsContextId SrsConnection::srs_id()
{
return trd->cid();
}

View file

@ -93,7 +93,8 @@ public:
virtual srs_error_t cycle();
public:
// Get the srs id which identify the client.
virtual std::string srs_id();
// TODO: FIXME: Rename to cid.
virtual SrsContextId srs_id();
// Get the remote ip of peer.
virtual std::string remote_ip();
// Set connection to expired.

View file

@ -533,7 +533,7 @@ srs_error_t SrsDvrMp4Segmenter::close_encoder()
return err;
}
SrsDvrAsyncCallOnDvr::SrsDvrAsyncCallOnDvr(std::string c, SrsRequest* r, string p)
SrsDvrAsyncCallOnDvr::SrsDvrAsyncCallOnDvr(SrsContextId c, SrsRequest* r, string p)
{
cid = c;
req = r->copy();
@ -673,7 +673,7 @@ srs_error_t SrsDvrPlan::on_reap_segment()
{
srs_error_t err = srs_success;
std::string cid = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsFragment* fragment = segment->current();
string fullpath = fragment->fullpath();

View file

@ -159,11 +159,11 @@ protected:
class SrsDvrAsyncCallOnDvr : public ISrsAsyncCallTask
{
private:
std::string cid;
SrsContextId cid;
std::string path;
SrsRequest* req;
public:
SrsDvrAsyncCallOnDvr(std::string c, SrsRequest* r, std::string p);
SrsDvrAsyncCallOnDvr(SrsContextId c, SrsRequest* r, std::string p);
virtual ~SrsDvrAsyncCallOnDvr();
public:
virtual srs_error_t call();

View file

@ -82,7 +82,7 @@ void SrsHlsSegment::config_cipher(unsigned char* key,unsigned char* iv)
fw->config_cipher(key, iv);
}
SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(string c, SrsRequest* r, string p, string t, string m, string mu, int s, srs_utime_t d)
SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(SrsContextId c, SrsRequest* r, string p, string t, string m, string mu, int s, srs_utime_t d)
{
req = r->copy();
cid = c;
@ -137,7 +137,7 @@ string SrsDvrAsyncCallOnHls::to_string()
return "on_hls: " + path;
}
SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(string c, SrsRequest* r, string u)
SrsDvrAsyncCallOnHlsNotify::SrsDvrAsyncCallOnHlsNotify(SrsContextId c, SrsRequest* r, string u)
{
cid = c;
req = r->copy();

View file

@ -80,7 +80,7 @@ public:
class SrsDvrAsyncCallOnHls : public ISrsAsyncCallTask
{
private:
std::string cid;
SrsContextId cid;
std::string path;
std::string ts_url;
std::string m3u8;
@ -90,7 +90,7 @@ private:
srs_utime_t duration;
public:
// TODO: FIXME: Use TBN 1000.
SrsDvrAsyncCallOnHls(std::string c, SrsRequest* r, std::string p, std::string t, std::string m, std::string mu, int s, srs_utime_t d);
SrsDvrAsyncCallOnHls(SrsContextId c, SrsRequest* r, std::string p, std::string t, std::string m, std::string mu, int s, srs_utime_t d);
virtual ~SrsDvrAsyncCallOnHls();
public:
virtual srs_error_t call();
@ -101,11 +101,11 @@ public:
class SrsDvrAsyncCallOnHlsNotify : public ISrsAsyncCallTask
{
private:
std::string cid;
SrsContextId cid;
std::string ts_url;
SrsRequest* req;
public:
SrsDvrAsyncCallOnHlsNotify(std::string c, SrsRequest* r, std::string u);
SrsDvrAsyncCallOnHlsNotify(SrsContextId c, SrsRequest* r, std::string u);
virtual ~SrsDvrAsyncCallOnHlsNotify();
public:
virtual srs_error_t call();

View file

@ -811,10 +811,10 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
// path: {pattern}{client_id}
// e.g. /api/v1/clients/100 pattern= /api/v1/clients/, client_id=100
std::string cid = r->parse_rest_id(entry->pattern);
std::string client_id = r->parse_rest_id(entry->pattern);
SrsStatisticClient* client = NULL;
if (cid != "" && (client = stat->find_client(cid)) == NULL) {
if (client_id != "" && (client = stat->find_client(client_id)) == NULL) {
return srs_api_response_code(w, r, ERROR_RTMP_CLIENT_NOT_FOUND);
}
@ -854,7 +854,7 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
}
client->conn->expire();
srs_warn("kickoff client id=%s ok", cid.c_str());
srs_warn("kickoff client id=%s ok", client_id.c_str());
} else {
return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed);
}

View file

@ -60,15 +60,13 @@ srs_error_t SrsHttpHooks::on_connect(string url, SrsRequest* req)
{
srs_error_t err = srs_success;
// TODO: FIXME: check client_id must be int?
std::string client_id = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_connect"));
// obj->set("client_id", SrsJsonAny::integer(client_id));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -82,11 +80,11 @@ srs_error_t SrsHttpHooks::on_connect(string url, SrsRequest* req)
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_connect failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
srs_trace("http: on_connect ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return err;
}
@ -95,13 +93,13 @@ void SrsHttpHooks::on_close(string url, SrsRequest* req, int64_t send_bytes, int
{
srs_error_t err = srs_success;
std::string client_id = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_close"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -117,12 +115,12 @@ void SrsHttpHooks::on_close(string url, SrsRequest* req, int64_t send_bytes, int
int ret = srs_error_code(err);
srs_freep(err);
srs_warn("http: ignore on_close failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret);
return;
}
srs_trace("http: on_close ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return;
}
@ -131,13 +129,13 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req)
{
srs_error_t err = srs_success;
std::string client_id = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_publish"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -152,11 +150,11 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req)
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_publish failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
srs_trace("http: on_publish ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return err;
}
@ -165,13 +163,13 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req)
{
srs_error_t err = srs_success;
std::string client_id = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_unpublish"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -187,12 +185,12 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req)
int ret = srs_error_code(err);
srs_freep(err);
srs_warn("http: ignore on_unpublish failed, client_id=%s, url=%s, request=%s, response=%s, status=%d, ret=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret);
return;
}
srs_trace("http: on_unpublish ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return;
}
@ -201,13 +199,13 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req)
{
srs_error_t err = srs_success;
std::string client_id = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_play"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -222,11 +220,11 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req)
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_play failed, client_id=%s, url=%s, request=%s, response=%s, status=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
srs_trace("http: on_play ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return err;
}
@ -235,13 +233,13 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req)
{
srs_error_t err = srs_success;
std::string client_id = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_stop"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -257,28 +255,28 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req)
int ret = srs_error_code(err);
srs_freep(err);
srs_warn("http: ignore on_stop failed, client_id=%s, url=%s, request=%s, response=%s, code=%d, ret=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code, ret);
return;
}
srs_trace("http: on_stop ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return;
}
srs_error_t SrsHttpHooks::on_dvr(std::string cid, string url, SrsRequest* req, string file)
srs_error_t SrsHttpHooks::on_dvr(SrsContextId c, string url, SrsRequest* req, string file)
{
srs_error_t err = srs_success;
std::string client_id = cid;
SrsContextId cid = c;
std::string cwd = _srs_config->cwd();
SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_dvr"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -294,20 +292,20 @@ srs_error_t SrsHttpHooks::on_dvr(std::string cid, string url, SrsRequest* req, s
SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http post on_dvr uri failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}
srs_trace("http hook on_dvr success. client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return err;
}
srs_error_t SrsHttpHooks::on_hls(std::string cid, string url, SrsRequest* req, string file, string ts_url, string m3u8, string m3u8_url, int sn, srs_utime_t duration)
srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, SrsRequest* req, string file, string ts_url, string m3u8, string m3u8_url, int sn, srs_utime_t duration)
{
srs_error_t err = srs_success;
std::string client_id = cid;
SrsContextId cid = c;
std::string cwd = _srs_config->cwd();
// the ts_url is under the same dir of m3u8_url.
@ -320,7 +318,7 @@ srs_error_t SrsHttpHooks::on_hls(std::string cid, string url, SrsRequest* req, s
SrsAutoFree(SrsJsonObject, obj);
obj->set("action", SrsJsonAny::str("on_hls"));
obj->set("client_id", SrsJsonAny::str(client_id.c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
@ -344,16 +342,16 @@ srs_error_t SrsHttpHooks::on_hls(std::string cid, string url, SrsRequest* req, s
}
srs_trace("http: on_hls ok, client_id=%s, url=%s, request=%s, response=%s",
client_id.c_str(), url.c_str(), data.c_str(), res.c_str());
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
return err;
}
srs_error_t SrsHttpHooks::on_hls_notify(std::string cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify)
srs_error_t SrsHttpHooks::on_hls_notify(SrsContextId c, std::string url, SrsRequest* req, std::string ts_url, int nb_notify)
{
srs_error_t err = srs_success;
std::string client_id = cid;
SrsContextId cid = c;
std::string cwd = _srs_config->cwd();
if (srs_string_is_http(ts_url)) {
@ -409,7 +407,7 @@ srs_error_t SrsHttpHooks::on_hls_notify(std::string cid, std::string url, SrsReq
int spenttime = (int)(srsu2ms(srs_update_system_time()) - starttime);
srs_trace("http hook on_hls_notify success. client_id=%s, url=%s, code=%d, spent=%dms, read=%dB, err=%s",
client_id.c_str(), url.c_str(), msg->status_code(), spenttime, nb_read, srs_error_desc(err).c_str());
cid.c_str(), url.c_str(), msg->status_code(), spenttime, nb_read, srs_error_desc(err).c_str());
// ignore any error for on_hls_notify.
srs_error_reset(err);

View file

@ -74,7 +74,7 @@ public:
// ignore if empty.
// @param file the file path, can be relative or absolute path.
// @param cid the source connection cid, for the on_dvr is async call.
static srs_error_t on_dvr(std::string cid, std::string url, SrsRequest* req, std::string file);
static srs_error_t on_dvr(SrsContextId cid, std::string url, SrsRequest* req, std::string file);
// When hls reap segment, callback.
// @param url the api server url, to process the event.
// ignore if empty.
@ -85,7 +85,7 @@ public:
// @param sn the seq_no, the sequence number of ts in hls/m3u8.
// @param duration the segment duration in srs_utime_t.
// @param cid the source connection cid, for the on_dvr is async call.
static srs_error_t on_hls(std::string cid, std::string url, SrsRequest* req, std::string file, std::string ts_url,
static srs_error_t on_hls(SrsContextId cid, std::string url, SrsRequest* req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration);
// When hls reap segment, callback.
// @param url the api server url, to process the event.
@ -93,7 +93,7 @@ public:
// @param ts_url the ts uri, used to replace the variable [ts_url] in url.
// @param nb_notify the max bytes to read from notify server.
// @param cid the source connection cid, for the on_dvr is async call.
static srs_error_t on_hls_notify(std::string cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
// Discover co-workers for origin cluster.
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
private:

View file

@ -94,7 +94,7 @@ void SrsFileLog::reopen()
open_log_file();
}
void SrsFileLog::verbose(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::verbose(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelVerbose) {
return;
@ -114,7 +114,7 @@ void SrsFileLog::verbose(const char* tag, const char* context_id, const char* fm
write_log(fd, log_data, size, SrsLogLevelVerbose);
}
void SrsFileLog::info(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::info(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelInfo) {
return;
@ -134,7 +134,7 @@ void SrsFileLog::info(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelInfo);
}
void SrsFileLog::trace(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::trace(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelTrace) {
return;
@ -154,7 +154,7 @@ void SrsFileLog::trace(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelTrace);
}
void SrsFileLog::warn(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::warn(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelWarn) {
return;
@ -174,7 +174,7 @@ void SrsFileLog::warn(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelWarn);
}
void SrsFileLog::error(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::error(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelError) {
return;

View file

@ -55,11 +55,11 @@ public:
public:
virtual srs_error_t initialize();
virtual void reopen();
virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...);
virtual void info(const char* tag, const char* context_id, const char* fmt, ...);
virtual void trace(const char* tag, const char* context_id, const char* fmt, ...);
virtual void warn(const char* tag, const char* context_id, const char* fmt, ...);
virtual void error(const char* tag, const char* context_id, const char* fmt, ...);
virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...);
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_reload_utc_time();

View file

@ -180,7 +180,7 @@ srs_error_t SrsProcess::start()
srs_info("fork process: %s", cli.c_str());
// for log
std::string cid = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
int ppid = getpid();
// TODO: fork or vfork?

View file

@ -57,7 +57,7 @@ ISrsMessagePumper::~ISrsMessagePumper()
{
}
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, std::string parent_cid)
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, SrsContextId parent_cid)
{
rtmp = r;
pumper = p;
@ -71,7 +71,7 @@ SrsRecvThread::~SrsRecvThread()
srs_freep(trd);
}
std::string SrsRecvThread::cid()
SrsContextId SrsRecvThread::cid()
{
return trd->cid();
}
@ -161,7 +161,7 @@ srs_error_t SrsRecvThread::do_cycle()
return err;
}
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, std::string parent_cid)
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
_consumer = consumer;
@ -278,7 +278,7 @@ void SrsQueueRecvThread::on_stop()
}
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, std::string parent_cid)
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
rtmp = rtmp_sdk;
@ -290,8 +290,7 @@ SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest*
_nb_msgs = 0;
video_frames = 0;
error = srs_cond_new();
ncid = cid = "";
req = _req;
mr_fd = mr_sock_fd;
@ -346,7 +345,7 @@ void SrsPublishRecvThread::set_cid(std::string v)
ncid = v;
}
std::string SrsPublishRecvThread::get_cid()
SrsContextId SrsPublishRecvThread::get_cid()
{
return ncid;
}
@ -374,7 +373,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
srs_error_t err = srs_success;
// when cid changed, change it.
if (ncid != cid) {
if (ncid.compare(cid)) {
_srs_context->set_id(ncid);
cid = ncid;
}

View file

@ -81,16 +81,16 @@ protected:
SrsCoroutine* trd;
ISrsMessagePumper* pumper;
SrsRtmpServer* rtmp;
std::string _parent_cid;
SrsContextId _parent_cid;
// The recv timeout in srs_utime_t.
srs_utime_t timeout;
public:
// Constructor.
// @param tm The receive timeout in srs_utime_t.
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, std::string parent_cid);
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, SrsContextId parent_cid);
virtual ~SrsRecvThread();
public:
virtual std::string cid();
virtual SrsContextId cid();
public:
virtual srs_error_t start();
virtual void stop();
@ -117,7 +117,7 @@ private:
SrsConsumer* _consumer;
public:
// TODO: FIXME: Refine timeout in time unit.
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, std::string parent_cid);
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid);
virtual ~SrsQueueRecvThread();
public:
virtual srs_error_t start();
@ -168,11 +168,11 @@ private:
// @see https://github.com/ossrs/srs/issues/244
srs_cond_t error;
// The merged context id.
std::string cid;
std::string ncid;
SrsContextId cid;
SrsContextId ncid;
public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, std::string parent_cid);
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, SrsContextId parent_cid);
virtual ~SrsPublishRecvThread();
public:
// Wait for error for some timeout.
@ -181,7 +181,7 @@ public:
virtual uint64_t nb_video_frames();
virtual srs_error_t error_code();
virtual void set_cid(std::string v);
virtual std::string get_cid();
virtual SrsContextId get_cid();
public:
virtual srs_error_t start();
virtual void stop();

View file

@ -183,7 +183,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
}
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* session = NULL;
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&request, remote_sdp, local_sdp, eip, false, &session)) != srs_success) {
return srs_error_wrap(err, "create session");
}
@ -541,7 +541,7 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
}
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* session = NULL;
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&request, remote_sdp, local_sdp, eip, true, &session)) != srs_success) {
return srs_error_wrap(err, "create session");
}
@ -808,7 +808,7 @@ srs_error_t SrsGoApiRtcNACK::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
return srs_error_new(ERROR_RTC_INVALID_PARAMS, "invalid drop=%s/%d", dropv.c_str(), drop);
}
SrsRtcSession* session = server_->find_session_by_username(username);
SrsRtcConnection* session = server_->find_session_by_username(username);
if (!session) {
return srs_error_new(ERROR_RTC_NO_SESSION, "no session username=%s", username.c_str());
}

View file

@ -57,20 +57,6 @@ using namespace std;
#include <srs_app_rtc_server.hpp>
#include <srs_app_rtc_source.hpp>
// TODO: FIXME: Move to utility.
string gen_random_str(int len)
{
static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
string ret;
ret.reserve(len);
for (int i = 0; i < len; ++i) {
ret.append(1, random_table[random() % random_table.size()]);
}
return ret;
}
uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32;
SrsNtp::SrsNtp()
@ -107,7 +93,7 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
}
SrsSecurityTransport::SrsSecurityTransport(SrsRtcSession* s)
SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)
{
session_ = s;
@ -151,7 +137,7 @@ srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size)
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = data; int len = size; SrsRtcSession* s = session_;
void* p = data; int len = size; SrsRtcConnection* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
@ -271,7 +257,7 @@ SrsRtcOutgoingInfo::~SrsRtcOutgoingInfo()
{
}
SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, string parent_cid)
SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid)
{
_parent_cid = parent_cid;
trd = new SrsDummyCoroutine();
@ -291,7 +277,7 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, string parent_cid)
_srs_config->subscribe(this);
}
SrsRtcPlayer::~SrsRtcPlayer()
SrsRtcPlayStream::~SrsRtcPlayStream()
{
_srs_config->unsubscribe(this);
@ -300,7 +286,7 @@ SrsRtcPlayer::~SrsRtcPlayer()
srs_freep(video_queue_);
}
srs_error_t SrsRtcPlayer::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt)
srs_error_t SrsRtcPlayStream::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt)
{
srs_error_t err = srs_success;
@ -324,7 +310,7 @@ srs_error_t SrsRtcPlayer::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_
return err;
}
srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost)
srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost)
{
SrsRequest* req = session_->req;
@ -340,17 +326,17 @@ srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost)
return srs_success;
}
srs_error_t SrsRtcPlayer::on_reload_vhost_realtime(string vhost)
srs_error_t SrsRtcPlayStream::on_reload_vhost_realtime(string vhost)
{
return on_reload_vhost_play(vhost);
}
std::string SrsRtcPlayer::cid()
SrsContextId SrsRtcPlayStream::cid()
{
return trd->cid();
}
srs_error_t SrsRtcPlayer::start()
srs_error_t SrsRtcPlayStream::start()
{
srs_error_t err = srs_success;
@ -364,21 +350,21 @@ srs_error_t SrsRtcPlayer::start()
return err;
}
void SrsRtcPlayer::stop()
void SrsRtcPlayStream::stop()
{
trd->stop();
}
void SrsRtcPlayer::stop_loop()
void SrsRtcPlayStream::stop_loop()
{
trd->interrupt();
}
srs_error_t SrsRtcPlayer::cycle()
srs_error_t SrsRtcPlayStream::cycle()
{
srs_error_t err = srs_success;
SrsRtcSource* source = NULL;
SrsRtcStream* source = NULL;
SrsRequest* req = session_->req;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
@ -470,7 +456,7 @@ srs_error_t SrsRtcPlayer::cycle()
}
}
srs_error_t SrsRtcPlayer::send_packets(SrsRtcSource* source, const vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
@ -512,7 +498,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcSource* source, const vector<SrsRtp
return err;
}
srs_error_t SrsRtcPlayer::do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
srs_error_t SrsRtcPlayStream::do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
@ -596,7 +582,7 @@ srs_error_t SrsRtcPlayer::do_send_packets(const std::vector<SrsRtpPacket2*>& pkt
return err;
}
void SrsRtcPlayer::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq)
void SrsRtcPlayStream::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq)
{
SrsRtpPacket2* pkt = NULL;
@ -611,12 +597,12 @@ void SrsRtcPlayer::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint1
}
}
void SrsRtcPlayer::simulate_nack_drop(int nn)
void SrsRtcPlayStream::simulate_nack_drop(int nn)
{
nn_simulate_nack_drop = nn;
}
void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
void SrsRtcPlayStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
@ -625,7 +611,7 @@ void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
nn_simulate_nack_drop--;
}
srs_error_t SrsRtcPlayer::on_rtcp(char* data, int nb_data)
srs_error_t SrsRtcPlayStream::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -690,21 +676,21 @@ srs_error_t SrsRtcPlayer::on_rtcp(char* data, int nb_data)
return err;
}
srs_error_t SrsRtcPlayer::on_rtcp_sr(char* buf, int nb_buf)
srs_error_t SrsRtcPlayStream::on_rtcp_sr(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
// TODO: FIXME: Implements it.
return err;
}
srs_error_t SrsRtcPlayer::on_rtcp_xr(char* buf, int nb_buf)
srs_error_t SrsRtcPlayStream::on_rtcp_xr(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
// TODO: FIXME: Implements it.
return err;
}
srs_error_t SrsRtcPlayer::on_rtcp_feedback(char* buf, int nb_buf)
srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -790,7 +776,7 @@ srs_error_t SrsRtcPlayer::on_rtcp_feedback(char* buf, int nb_buf)
return err;
}
srs_error_t SrsRtcPlayer::on_rtcp_ps_feedback(char* buf, int nb_buf)
srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -813,7 +799,7 @@ srs_error_t SrsRtcPlayer::on_rtcp_ps_feedback(char* buf, int nb_buf)
switch (fmt) {
case kPLI: {
ISrsRtcPublisher* publisher = session_->source_->rtc_publisher();
ISrsRtcPublishStream* publisher = session_->source_->publish_stream();
if (publisher) {
publisher->request_keyframe();
srs_trace("RTC request PLI");
@ -840,14 +826,14 @@ srs_error_t SrsRtcPlayer::on_rtcp_ps_feedback(char* buf, int nb_buf)
return err;
}
srs_error_t SrsRtcPlayer::on_rtcp_rr(char* data, int nb_data)
srs_error_t SrsRtcPlayStream::on_rtcp_rr(char* data, int nb_data)
{
srs_error_t err = srs_success;
// TODO: FIXME: Implements it.
return err;
}
SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session)
{
report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
@ -869,11 +855,11 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
twcc_fb_count_ = 0;
}
SrsRtcPublisher::~SrsRtcPublisher()
SrsRtcPublishStream::~SrsRtcPublishStream()
{
// TODO: FIXME: Do unpublish when session timeout.
if (source) {
source->set_rtc_publisher(NULL);
source->set_publish_stream(NULL);
source->on_unpublish();
}
@ -884,7 +870,7 @@ SrsRtcPublisher::~SrsRtcPublisher()
srs_freep(audio_queue_);
}
srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* r)
srs_error_t SrsRtcPublishStream::initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* r)
{
srs_error_t err = srs_success;
@ -923,7 +909,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc
return srs_error_wrap(err, "on publish");
}
source->set_rtc_publisher(this);
source->set_publish_stream(this);
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) {
@ -934,7 +920,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc
return err;
}
void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc)
void SrsRtcPublishStream::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc)
{
// If DTLS is not OK, drop all messages.
if (!session_->transport_) {
@ -967,7 +953,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_;
void* p = stream.data(); int len = stream.pos(); SrsRtcConnection* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
@ -984,7 +970,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr
}
}
srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue)
srs_error_t SrsRtcPublishStream::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue)
{
srs_error_t err = srs_success;
@ -1042,7 +1028,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_q
return err;
}
srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc)
srs_error_t SrsRtcPublishStream::send_rtcp_xr_rrtr(uint32_t ssrc)
{
srs_error_t err = srs_success;
@ -1103,7 +1089,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc)
return err;
}
srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc)
srs_error_t SrsRtcPublishStream::send_rtcp_fb_pli(uint32_t ssrc)
{
srs_error_t err = srs_success;
@ -1124,7 +1110,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc)
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_;
void* p = stream.data(); int len = stream.pos(); SrsRtcConnection* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
@ -1140,12 +1126,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc)
return err;
}
srs_error_t SrsRtcPublisher::on_twcc(uint16_t sn) {
srs_error_t SrsRtcPublishStream::on_twcc(uint16_t sn) {
srs_utime_t now = srs_get_system_time();
return rtcp_twcc_.recv_packet(sn, now);
}
srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data)
srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -1201,7 +1187,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data)
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = unprotected_buf; int len = nb_unprotected_buf; SrsRtcSession* s = session_;
void* p = unprotected_buf; int len = nb_unprotected_buf; SrsRtcConnection* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
@ -1254,7 +1240,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data)
return err;
}
void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload)
void SrsRtcPublishStream::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload)
{
// No payload, ignore.
if (buf->empty()) {
@ -1276,7 +1262,7 @@ void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* bu
}
}
srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
srs_error_t SrsRtcPublishStream::on_audio(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
@ -1288,7 +1274,7 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
return err;
}
srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
srs_error_t SrsRtcPublishStream::on_video(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
@ -1307,7 +1293,7 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
return err;
}
srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt)
srs_error_t SrsRtcPublishStream::on_nack(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
@ -1352,7 +1338,7 @@ srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt)
return err;
}
srs_error_t SrsRtcPublisher::send_periodic_twcc()
srs_error_t SrsRtcPublishStream::send_periodic_twcc()
{
srs_error_t err = srs_success;
srs_utime_t now = srs_get_system_time();
@ -1381,7 +1367,7 @@ srs_error_t SrsRtcPublisher::send_periodic_twcc()
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data)
srs_error_t SrsRtcPublishStream::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -1446,7 +1432,7 @@ srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data)
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp_sr(char* buf, int nb_buf)
srs_error_t SrsRtcPublishStream::on_rtcp_sr(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -1537,7 +1523,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf)
srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -1601,14 +1587,14 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf)
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp_feedback(char* buf, int nb_buf)
srs_error_t SrsRtcPublishStream::on_rtcp_feedback(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
// TODO: FIXME: Implements it.
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp_ps_feedback(char* buf, int nb_buf)
srs_error_t SrsRtcPublishStream::on_rtcp_ps_feedback(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -1654,7 +1640,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_ps_feedback(char* buf, int nb_buf)
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp_rr(char* buf, int nb_buf)
srs_error_t SrsRtcPublishStream::on_rtcp_rr(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -1723,16 +1709,16 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
return err;
}
void SrsRtcPublisher::request_keyframe()
void SrsRtcPublishStream::request_keyframe()
{
std::string scid = _srs_context->get_id();
std::string pcid = session_->context_id();
SrsContextId scid = _srs_context->get_id();
SrsContextId pcid = session_->context_id();
srs_trace("RTC play=[%d][%s] request keyframe from publish=[%d][%s]", ::getpid(), scid.c_str(), ::getpid(), pcid.c_str());
request_keyframe_ = true;
}
srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
@ -1750,12 +1736,12 @@ srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t
return err;
}
void SrsRtcPublisher::simulate_nack_drop(int nn)
void SrsRtcPublishStream::simulate_nack_drop(int nn)
{
nn_simulate_nack_drop = nn;
}
void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
void SrsRtcPublishStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
@ -1764,10 +1750,9 @@ void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
nn_simulate_nack_drop--;
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s)
{
req = NULL;
cid = "";
is_publisher_ = false;
encrypt = true;
@ -1788,7 +1773,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
blackhole_stfd = NULL;
}
SrsRtcSession::~SrsRtcSession()
SrsRtcConnection::~SrsRtcConnection()
{
srs_freep(player_);
srs_freep(publisher_);
@ -1799,73 +1784,73 @@ SrsRtcSession::~SrsRtcSession()
srs_freep(sendonly_skt);
}
SrsSdp* SrsRtcSession::get_local_sdp()
SrsSdp* SrsRtcConnection::get_local_sdp()
{
return &local_sdp;
}
void SrsRtcSession::set_local_sdp(const SrsSdp& sdp)
void SrsRtcConnection::set_local_sdp(const SrsSdp& sdp)
{
local_sdp = sdp;
}
SrsSdp* SrsRtcSession::get_remote_sdp()
SrsSdp* SrsRtcConnection::get_remote_sdp()
{
return &remote_sdp;
}
void SrsRtcSession::set_remote_sdp(const SrsSdp& sdp)
void SrsRtcConnection::set_remote_sdp(const SrsSdp& sdp)
{
remote_sdp = sdp;
}
SrsRtcSessionStateType SrsRtcSession::state()
SrsRtcConnectionStateType SrsRtcConnection::state()
{
return state_;
}
void SrsRtcSession::set_state(SrsRtcSessionStateType state)
void SrsRtcConnection::set_state(SrsRtcConnectionStateType state)
{
state_ = state;
}
string SrsRtcSession::id()
string SrsRtcConnection::id()
{
return peer_id_ + "/" + username_;
}
string SrsRtcSession::peer_id()
string SrsRtcConnection::peer_id()
{
return peer_id_;
}
void SrsRtcSession::set_peer_id(string v)
void SrsRtcConnection::set_peer_id(string v)
{
peer_id_ = v;
}
string SrsRtcSession::username()
string SrsRtcConnection::username()
{
return username_;
}
void SrsRtcSession::set_encrypt(bool v)
void SrsRtcConnection::set_encrypt(bool v)
{
encrypt = v;
}
void SrsRtcSession::switch_to_context()
void SrsRtcConnection::switch_to_context()
{
_srs_context->set_id(cid);
}
std::string SrsRtcSession::context_id()
SrsContextId SrsRtcConnection::context_id()
{
return cid;
}
srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, string username, std::string context_id)
srs_error_t SrsRtcConnection::initialize(SrsRtcStream* source, SrsRequest* r, bool is_publisher, string username, SrsContextId context_id)
{
srs_error_t err = srs_success;
@ -1912,7 +1897,7 @@ srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool
return err;
}
srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
{
srs_error_t err = srs_success;
@ -1942,12 +1927,12 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
return err;
}
srs_error_t SrsRtcSession::on_dtls(char* data, int nb_data)
srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data)
{
return transport_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data)
srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -1978,7 +1963,7 @@ srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data)
return err;
}
srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data)
srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{
if (publisher_ == NULL) {
return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null");
@ -1991,7 +1976,7 @@ srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data)
return publisher_->on_rtp(data, nb_data);
}
srs_error_t SrsRtcSession::on_connection_established()
srs_error_t SrsRtcConnection::on_connection_established()
{
srs_error_t err = srs_success;
@ -2011,7 +1996,7 @@ srs_error_t SrsRtcSession::on_connection_established()
return err;
}
srs_error_t SrsRtcSession::start_play()
srs_error_t SrsRtcConnection::start_play()
{
srs_error_t err = srs_success;
@ -2020,7 +2005,7 @@ srs_error_t SrsRtcSession::start_play()
if (player_) {
return err;
}
player_ = new SrsRtcPlayer(this, _srs_context->get_id());
player_ = new SrsRtcPlayStream(this, _srs_context->get_id());
uint32_t video_ssrc = 0;
uint32_t audio_ssrc = 0;
@ -2038,17 +2023,17 @@ srs_error_t SrsRtcSession::start_play()
}
if ((err = player_->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) {
return srs_error_wrap(err, "SrsRtcPlayer init");
return srs_error_wrap(err, "SrsRtcPlayStream init");
}
if ((err = player_->start()) != srs_success) {
return srs_error_wrap(err, "start SrsRtcPlayer");
return srs_error_wrap(err, "start SrsRtcPlayStream");
}
return err;
}
srs_error_t SrsRtcSession::start_publish()
srs_error_t SrsRtcConnection::start_publish()
{
srs_error_t err = srs_success;
@ -2057,7 +2042,7 @@ srs_error_t SrsRtcSession::start_publish()
if (publisher_) {
return err;
}
publisher_ = new SrsRtcPublisher(this);
publisher_ = new SrsRtcPublishStream(this);
// Request PLI for exists players?
//publisher_->request_keyframe();
@ -2100,12 +2085,12 @@ srs_error_t SrsRtcSession::start_publish()
return err;
}
bool SrsRtcSession::is_stun_timeout()
bool SrsRtcConnection::is_stun_timeout()
{
return last_stun_time + sessionStunTimeout < srs_get_system_time();
}
void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt)
void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
{
if (sendonly_skt) {
srs_trace("session %s address changed, update %s -> %s",
@ -2116,7 +2101,7 @@ void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt)
sendonly_skt = skt->copy_sendonly();
}
void SrsRtcSession::simulate_nack_drop(int nn)
void SrsRtcConnection::simulate_nack_drop(int nn)
{
if (player_) {
player_->simulate_nack_drop(nn);
@ -2134,7 +2119,7 @@ void SrsRtcSession::simulate_nack_drop(int nn)
#define be32toh ntohl
#endif
srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r)
srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
{
srs_error_t err = srs_success;

View file

@ -48,9 +48,9 @@ class SrsUdpMuxSocket;
class SrsConsumer;
class SrsStunPacket;
class SrsRtcServer;
class SrsRtcSession;
class SrsRtcConnection;
class SrsSharedPtrMessage;
class SrsRtcSource;
class SrsRtcStream;
class SrsRtpPacket2;
class ISrsCodec;
class SrsRtpNackForReceiver;
@ -75,9 +75,6 @@ const uint8_t kSLI = 2;
const uint8_t kRPSI = 3;
const uint8_t kAFB = 15;
// TODO: FIXME: Move to utility.
extern std::string gen_random_str(int len);
class SrsNtp
{
public:
@ -95,7 +92,7 @@ public:
static uint64_t kMagicNtpFractionalUnit;
};
enum SrsRtcSessionStateType
enum SrsRtcConnectionStateType
{
// TODO: FIXME: Should prefixed by enum name.
INIT = -1,
@ -109,12 +106,12 @@ enum SrsRtcSessionStateType
class SrsSecurityTransport : public ISrsDtlsCallback
{
private:
SrsRtcSession* session_;
SrsRtcConnection* session_;
SrsDtls* dtls_;
SrsSRTP* srtp_;
bool handshake_done;
public:
SrsSecurityTransport(SrsRtcSession* s);
SrsSecurityTransport(SrsRtcConnection* s);
virtual ~SrsSecurityTransport();
srs_error_t initialize(SrsSessionConfig* cfg);
@ -182,12 +179,13 @@ public:
virtual ~SrsRtcOutgoingInfo();
};
class SrsRtcPlayer : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
// A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
{
protected:
std::string _parent_cid;
SrsContextId _parent_cid;
SrsCoroutine* trd;
SrsRtcSession* session_;
SrsRtcConnection* session_;
private:
// TODO: FIXME: How to handle timestamp overflow?
// Information for audio.
@ -208,8 +206,8 @@ private:
// Whether enabled nack.
bool nack_enabled_;
public:
SrsRtcPlayer(SrsRtcSession* s, std::string parent_cid);
virtual ~SrsRtcPlayer();
SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid);
virtual ~SrsRtcPlayStream();
public:
srs_error_t initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt);
// interface ISrsReloadHandler
@ -217,7 +215,7 @@ public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
virtual srs_error_t on_reload_vhost_realtime(std::string vhost);
public:
virtual std::string cid();
virtual SrsContextId cid();
public:
virtual srs_error_t start();
virtual void stop();
@ -225,7 +223,7 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_packets(SrsRtcSource* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
srs_error_t do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
@ -242,13 +240,14 @@ private:
srs_error_t on_rtcp_rr(char* data, int nb_data);
};
class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublisher
// A RTC publish stream, client push and publish stream to SRS.
class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublishStream
{
private:
SrsHourGlass* report_timer;
uint64_t nn_audio_frames;
private:
SrsRtcSession* session_;
SrsRtcConnection* session_;
uint32_t video_ssrc;
uint32_t audio_ssrc;
uint16_t pt_to_drop_;
@ -262,7 +261,7 @@ private:
SrsRtpNackForReceiver* audio_nack_;
private:
SrsRequest* req;
SrsRtcSource* source;
SrsRtcStream* source;
// Simulators.
int nn_simulate_nack_drop;
private:
@ -275,8 +274,8 @@ private:
SrsRtcpTWCC rtcp_twcc_;
SrsRtpExtensionTypes extension_types_;
public:
SrsRtcPublisher(SrsRtcSession* session);
virtual ~SrsRtcPublisher();
SrsRtcPublishStream(SrsRtcConnection* session);
virtual ~SrsRtcPublishStream();
public:
srs_error_t initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* req);
private:
@ -313,19 +312,20 @@ private:
srs_error_t on_twcc(uint16_t sn);
};
class SrsRtcSession
// A RTC Peer Connection, SDP level object.
class SrsRtcConnection
{
friend class SrsSecurityTransport;
friend class SrsRtcPlayer;
friend class SrsRtcPublisher;
friend class SrsRtcPlayStream;
friend class SrsRtcPublishStream;
public:
bool disposing_;
private:
SrsRtcServer* server_;
SrsRtcSessionStateType state_;
SrsRtcConnectionStateType state_;
SrsSecurityTransport* transport_;
SrsRtcPlayer* player_;
SrsRtcPublisher* publisher_;
SrsRtcPlayStream* player_;
SrsRtcPublishStream* publisher_;
bool is_publisher_;
private:
SrsUdpMuxSocket* sendonly_skt;
@ -337,14 +337,14 @@ private:
srs_utime_t last_stun_time;
private:
// For each RTC session, we use a specified cid for debugging logs.
std::string cid;
SrsContextId cid;
// For each RTC session, whether requires encrypt.
// Read config value, rtc_server.encrypt, default to on.
// Sepcifies by HTTP API, query encrypt, optional.
// TODO: FIXME: Support reload.
bool encrypt;
SrsRequest* req;
SrsRtcSource* source_;
SrsRtcStream* source_;
SrsSdp remote_sdp;
SrsSdp local_sdp;
public:
@ -357,25 +357,25 @@ private:
sockaddr_in* blackhole_addr;
srs_netfd_t blackhole_stfd;
public:
SrsRtcSession(SrsRtcServer* s);
virtual ~SrsRtcSession();
SrsRtcConnection(SrsRtcServer* s);
virtual ~SrsRtcConnection();
public:
SrsSdp* get_local_sdp();
void set_local_sdp(const SrsSdp& sdp);
SrsSdp* get_remote_sdp();
void set_remote_sdp(const SrsSdp& sdp);
SrsRtcSessionStateType state();
void set_state(SrsRtcSessionStateType state);
SrsRtcConnectionStateType state();
void set_state(SrsRtcConnectionStateType state);
std::string id();
std::string peer_id();
void set_peer_id(std::string v);
std::string username();
void set_encrypt(bool v);
void switch_to_context();
std::string context_id();
SrsContextId context_id();
public:
// Before initialize, user must set the local SDP, which is used to inititlize DTLS.
srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, std::string context_id);
srs_error_t initialize(SrsRtcStream* source, SrsRequest* r, bool is_publisher, std::string username, SrsContextId context_id);
// The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r);
srs_error_t on_dtls(char* data, int nb_data);
@ -401,13 +401,13 @@ public:
virtual ~ISrsRtcHijacker();
public:
// When start publisher by RTC.
virtual srs_error_t on_start_publish(SrsRtcSession* session, SrsRtcPublisher* publisher, SrsRequest* req) = 0;
virtual srs_error_t on_start_publish(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req) = 0;
// When got RTP plaintext packet.
virtual srs_error_t on_rtp_packet(SrsRtcSession* session, SrsRtcPublisher* publisher, SrsRequest* req, SrsRtpPacket2* pkt) = 0;
virtual srs_error_t on_rtp_packet(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req, SrsRtpPacket2* pkt) = 0;
// When start player by RTC.
virtual srs_error_t on_start_play(SrsRtcSession* session, SrsRtcPlayer* player, SrsRequest* req) = 0;
virtual srs_error_t on_start_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0;
// When start consuming for player for RTC.
virtual srs_error_t on_start_consume(SrsRtcSession* session, SrsRtcPlayer* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0;
virtual srs_error_t on_start_consume(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0;
};
extern ISrsRtcHijacker* _srs_rtc_hijacker;

View file

@ -281,7 +281,6 @@ SSL_CTX* SrsDtls::build_dtls_ctx()
#endif
if (_srs_rtc_dtls_certificate->is_ecdsa()) { // By ECDSA, https://stackoverflow.com/a/6006898
#if OPENSSL_VERSION_NUMBER >= 0x10002000L // v1.0.2
// For ECDSA, we could set the curves list.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set1_curves_list.html

View file

@ -37,6 +37,7 @@
#include <srs_app_http_api.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_service_utility.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtc_api.hpp>
@ -154,9 +155,9 @@ SrsRtcServer::~SrsRtcServer()
}
if (true) {
std::vector<SrsRtcSession*>::iterator it;
std::vector<SrsRtcConnection*>::iterator it;
for (it = zombies_.begin(); it != zombies_.end(); ++it) {
SrsRtcSession* session = *it;
SrsRtcConnection* session = *it;
srs_freep(session);
}
}
@ -221,7 +222,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
srs_error_t err = srs_success;
char* data = skt->data(); int size = skt->size();
SrsRtcSession* session = find_session_by_peer_id(skt->peer_id());
SrsRtcConnection* session = find_session_by_peer_id(skt->peer_id());
if (session) {
// Now, we got the RTC session to handle the packet, switch to its context
@ -296,11 +297,11 @@ srs_error_t SrsRtcServer::listen_api()
srs_error_t SrsRtcServer::create_session(
SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish,
SrsRtcSession** psession
SrsRtcConnection** psession
) {
srs_error_t err = srs_success;
SrsRtcSource* source = NULL;
SrsRtcStream* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -310,12 +311,12 @@ srs_error_t SrsRtcServer::create_session(
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str());
}
std::string local_pwd = gen_random_str(32);
std::string local_pwd = srs_random_str(32);
std::string local_ufrag = "";
// TODO: FIXME: Rename for a better name, it's not an username.
std::string username = "";
while (true) {
local_ufrag = gen_random_str(8);
local_ufrag = srs_random_str(8);
username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
if (!map_username_session.count(username)) {
@ -338,13 +339,13 @@ srs_error_t SrsRtcServer::create_session(
}
}
SrsRtcSession* session = new SrsRtcSession(this);
SrsRtcConnection* session = new SrsRtcConnection(this);
session->set_remote_sdp(remote_sdp);
// We must setup the local SDP, then initialize the session object.
session->set_local_sdp(local_sdp);
session->set_state(WAITING_STUN);
std::string cid = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(source, req, publish, username, cid)) != srs_success) {
srs_freep(session);
@ -357,15 +358,15 @@ srs_error_t SrsRtcServer::create_session(
return err;
}
srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcSession** psession)
srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcConnection** psession)
{
srs_error_t err = srs_success;
std::string local_pwd = gen_random_str(32);
std::string local_pwd = srs_random_str(32);
// TODO: FIXME: Collision detect.
std::string local_ufrag = gen_random_str(8);
std::string local_ufrag = srs_random_str(8);
SrsRtcSession* session = new SrsRtcSession(this);
SrsRtcConnection* session = new SrsRtcConnection(this);
*psession = session;
local_sdp.set_ice_ufrag(local_ufrag);
@ -385,7 +386,7 @@ srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcSession** pse
return err;
}
srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req, const SrsSdp& remote_sdp)
srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp)
{
srs_error_t err = srs_success;
@ -393,7 +394,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req
return err;
}
SrsRtcSource* source = NULL;
SrsRtcStream* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -401,7 +402,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req
// TODO: FIXME: Collision detect.
string username = session->get_local_sdp()->get_ice_ufrag() + ":" + remote_sdp.get_ice_ufrag();
std::string cid = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
if ((err = session->initialize(source, req, false, username, cid)) != srs_success) {
return srs_error_wrap(err, "init");
}
@ -414,14 +415,14 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req
return err;
}
void SrsRtcServer::destroy(SrsRtcSession* session)
void SrsRtcServer::destroy(SrsRtcConnection* session)
{
if (session->disposing_) {
return;
}
session->disposing_ = true;
std::map<std::string, SrsRtcSession*>::iterator it;
std::map<std::string, SrsRtcConnection*>::iterator it;
if ((it = map_username_session.find(session->username())) != map_username_session.end()) {
map_username_session.erase(it);
@ -434,16 +435,16 @@ void SrsRtcServer::destroy(SrsRtcSession* session)
zombies_.push_back(session);
}
bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* session)
bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session)
{
return map_id_session.insert(make_pair(peer_id, session)).second;
}
void SrsRtcServer::check_and_clean_timeout_session()
{
map<string, SrsRtcSession*>::iterator iter = map_username_session.begin();
map<string, SrsRtcConnection*>::iterator iter = map_username_session.begin();
while (iter != map_username_session.end()) {
SrsRtcSession* session = iter->second;
SrsRtcConnection* session = iter->second;
srs_assert(session);
if (!session->is_stun_timeout()) {
@ -474,9 +475,9 @@ int SrsRtcServer::nn_sessions()
return (int)map_username_session.size();
}
SrsRtcSession* SrsRtcServer::find_session_by_peer_id(const string& peer_id)
SrsRtcConnection* SrsRtcServer::find_session_by_peer_id(const string& peer_id)
{
map<string, SrsRtcSession*>::iterator iter = map_id_session.find(peer_id);
map<string, SrsRtcConnection*>::iterator iter = map_id_session.find(peer_id);
if (iter == map_id_session.end()) {
return NULL;
}
@ -484,9 +485,9 @@ SrsRtcSession* SrsRtcServer::find_session_by_peer_id(const string& peer_id)
return iter->second;
}
SrsRtcSession* SrsRtcServer::find_session_by_username(const std::string& username)
SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username)
{
map<string, SrsRtcSession*>::iterator iter = map_username_session.find(username);
map<string, SrsRtcConnection*>::iterator iter = map_username_session.find(username);
if (iter == map_username_session.end()) {
return NULL;
}
@ -506,12 +507,12 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
return err;
}
std::vector<SrsRtcSession*> zombies;
std::vector<SrsRtcConnection*> zombies;
zombies.swap(zombies_);
std::vector<SrsRtcSession*>::iterator it;
std::vector<SrsRtcConnection*>::iterator it;
for (it = zombies.begin(); it != zombies.end(); ++it) {
SrsRtcSession* session = *it;
SrsRtcConnection* session = *it;
srs_freep(session);
}

View file

@ -36,7 +36,7 @@
class SrsRtcServer;
class SrsHourGlass;
class SrsRtcSession;
class SrsRtcConnection;
class SrsRequest;
class SrsSdp;
@ -47,7 +47,7 @@ public:
virtual ~ISrsRtcServerHandler();
public:
// When server detect the timeout for session object.
virtual void on_timeout(SrsRtcSession* session) = 0;
virtual void on_timeout(SrsRtcConnection* session) = 0;
};
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass
@ -57,10 +57,10 @@ private:
std::vector<SrsUdpMuxListener*> listeners;
ISrsRtcServerHandler* handler;
private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
std::map<std::string, SrsRtcConnection*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcConnection*> map_id_session; // key: peerip(ip + ":" + port)
// The zombie sessions, we will free them.
std::vector<SrsRtcSession*> zombies_;
std::vector<SrsRtcConnection*> zombies_;
public:
SrsRtcServer();
virtual ~SrsRtcServer();
@ -78,20 +78,20 @@ public:
// Peer start offering, we answer it.
srs_error_t create_session(
SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish,
SrsRtcSession** psession
SrsRtcConnection** psession
);
// We start offering, create_session2 to generate offer, setup_session2 to handle answer.
srs_error_t create_session2(SrsSdp& local_sdp, SrsRtcSession** psession);
srs_error_t setup_session2(SrsRtcSession* session, SrsRequest* req, const SrsSdp& remote_sdp);
srs_error_t create_session2(SrsSdp& local_sdp, SrsRtcConnection** psession);
srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp);
// Destroy the session from server.
void destroy(SrsRtcSession* session);
void destroy(SrsRtcConnection* session);
public:
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* session);
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session);
void check_and_clean_timeout_session();
int nn_sessions();
SrsRtcSession* find_session_by_username(const std::string& ufrag);
SrsRtcConnection* find_session_by_username(const std::string& ufrag);
private:
SrsRtcSession* find_session_by_peer_id(const std::string& peer_id);
SrsRtcConnection* find_session_by_peer_id(const std::string& peer_id);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);

View file

@ -90,7 +90,7 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFor
return err;
}
SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s)
SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s)
{
source = s;
should_update_source_id = false;
@ -140,7 +140,7 @@ srs_error_t SrsRtcConsumer::dump_packets(std::vector<SrsRtpPacket2*>& pkts)
srs_error_t err = srs_success;
if (should_update_source_id) {
srs_trace("update source_id=%d[%d]", source->source_id().c_str(), source->source_id().c_str());
srs_trace("update source_id=%s[%s]", source->source_id().c_str(), source->source_id().c_str());
should_update_source_id = false;
}
@ -165,21 +165,21 @@ void SrsRtcConsumer::wait(int nb_msgs)
srs_cond_wait(mw_wait);
}
SrsRtcSourceManager::SrsRtcSourceManager()
SrsRtcStreamManager::SrsRtcStreamManager()
{
lock = NULL;
}
SrsRtcSourceManager::~SrsRtcSourceManager()
SrsRtcStreamManager::~SrsRtcStreamManager()
{
srs_mutex_destroy(lock);
}
srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps)
srs_error_t SrsRtcStreamManager::fetch_or_create(SrsRequest* r, SrsRtcStream** pps)
{
srs_error_t err = srs_success;
// Lazy create lock, because ST is not ready in SrsRtcSourceManager constructor.
// Lazy create lock, because ST is not ready in SrsRtcStreamManager constructor.
if (!lock) {
lock = srs_mutex_new();
}
@ -188,7 +188,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsRtcSource* source = NULL;
SrsRtcStream* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return err;
@ -202,7 +202,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsRtcSource();
source = new SrsRtcStream();
if ((err = source->initialize(r)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}
@ -214,9 +214,9 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p
return err;
}
SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r)
SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
{
SrsRtcSource* source = NULL;
SrsRtcStream* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
@ -233,21 +233,20 @@ SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r)
return source;
}
SrsRtcSourceManager* _srs_rtc_sources = new SrsRtcSourceManager();
SrsRtcStreamManager* _srs_rtc_sources = new SrsRtcStreamManager();
ISrsRtcPublisher::ISrsRtcPublisher()
ISrsRtcPublishStream::ISrsRtcPublishStream()
{
}
ISrsRtcPublisher::~ISrsRtcPublisher()
ISrsRtcPublishStream::~ISrsRtcPublishStream()
{
}
SrsRtcSource::SrsRtcSource()
SrsRtcStream::SrsRtcStream()
{
_source_id = _pre_source_id = "";
_can_publish = true;
rtc_publisher_ = NULL;
publish_stream_ = NULL;
req = NULL;
#ifdef SRS_FFMPEG_FIT
@ -257,7 +256,7 @@ SrsRtcSource::SrsRtcSource()
#endif
}
SrsRtcSource::~SrsRtcSource()
SrsRtcStream::~SrsRtcStream()
{
// never free the consumers,
// for all consumers are auto free.
@ -267,7 +266,7 @@ SrsRtcSource::~SrsRtcSource()
srs_freep(bridger_);
}
srs_error_t SrsRtcSource::initialize(SrsRequest* r)
srs_error_t SrsRtcStream::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
@ -283,22 +282,22 @@ srs_error_t SrsRtcSource::initialize(SrsRequest* r)
return err;
}
void SrsRtcSource::update_auth(SrsRequest* r)
void SrsRtcStream::update_auth(SrsRequest* r)
{
req->update_auth(r);
}
srs_error_t SrsRtcSource::on_source_id_changed(std::string id)
srs_error_t SrsRtcStream::on_source_id_changed(SrsContextId id)
{
srs_error_t err = srs_success;
if (_source_id == id) {
if (!_source_id.compare(id)) {
return err;
}
if (_pre_source_id == "") {
if (_pre_source_id.empty()) {
_pre_source_id = id;
} else if (_pre_source_id != _source_id) {
} else if (_pre_source_id.compare(_source_id)) {
_pre_source_id = _source_id;
}
@ -314,22 +313,22 @@ srs_error_t SrsRtcSource::on_source_id_changed(std::string id)
return err;
}
std::string SrsRtcSource::source_id()
SrsContextId SrsRtcStream::source_id()
{
return _source_id;
}
std::string SrsRtcSource::pre_source_id()
SrsContextId SrsRtcStream::pre_source_id()
{
return _pre_source_id;
}
ISrsSourceBridger* SrsRtcSource::bridger()
ISrsSourceBridger* SrsRtcStream::bridger()
{
return bridger_;
}
srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;
@ -341,7 +340,7 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
return err;
}
srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg)
srs_error_t SrsRtcStream::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
@ -351,7 +350,7 @@ srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool
return err;
}
void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)
void SrsRtcStream::on_consumer_destroy(SrsRtcConsumer* consumer)
{
std::vector<SrsRtcConsumer*>::iterator it;
it = std::find(consumers.begin(), consumers.end(), consumer);
@ -360,12 +359,12 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)
}
}
bool SrsRtcSource::can_publish(bool is_edge)
bool SrsRtcStream::can_publish(bool is_edge)
{
return _can_publish;
}
srs_error_t SrsRtcSource::on_publish()
srs_error_t SrsRtcStream::on_publish()
{
srs_error_t err = srs_success;
@ -385,7 +384,7 @@ srs_error_t SrsRtcSource::on_publish()
return err;
}
void SrsRtcSource::on_unpublish()
void SrsRtcStream::on_unpublish()
{
// ignore when already unpublished.
if (_can_publish) {
@ -395,22 +394,22 @@ void SrsRtcSource::on_unpublish()
srs_trace("cleanup when unpublish");
_can_publish = true;
_source_id = -1;
_source_id = SrsContextId();
// TODO: FIXME: Handle by statistic.
}
ISrsRtcPublisher* SrsRtcSource::rtc_publisher()
ISrsRtcPublishStream* SrsRtcStream::publish_stream()
{
return rtc_publisher_;
return publish_stream_;
}
void SrsRtcSource::set_rtc_publisher(ISrsRtcPublisher* v)
void SrsRtcStream::set_publish_stream(ISrsRtcPublishStream* v)
{
rtc_publisher_ = v;
publish_stream_ = v;
}
srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt)
srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
@ -425,7 +424,7 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt)
}
#ifdef SRS_FFMPEG_FIT
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source)
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
{
req = NULL;
source_ = source;
@ -728,7 +727,7 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* f
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt)
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt)
{
srs_error_t err = srs_success;

View file

@ -38,7 +38,7 @@ class SrsMetaCache;
class SrsSharedPtrMessage;
class SrsCommonMessage;
class SrsMessageArray;
class SrsRtcSource;
class SrsRtcStream;
class SrsRtcFromRtmpBridger;
class SrsAudioRecode;
class SrsRtpPacket2;
@ -47,7 +47,7 @@ class SrsSample;
class SrsRtcConsumer
{
private:
SrsRtcSource* source;
SrsRtcStream* source;
std::vector<SrsRtpPacket2*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
@ -57,7 +57,7 @@ private:
bool mw_waiting;
int mw_min_msgs;
public:
SrsRtcConsumer(SrsRtcSource* s);
SrsRtcConsumer(SrsRtcStream* s);
virtual ~SrsRtcConsumer();
public:
// When source id changed, notice client to print.
@ -71,49 +71,51 @@ public:
virtual void wait(int nb_msgs);
};
class SrsRtcSourceManager
class SrsRtcStreamManager
{
private:
srs_mutex_t lock;
std::map<std::string, SrsRtcSource*> pool;
std::map<std::string, SrsRtcStream*> pool;
public:
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
SrsRtcStreamManager();
virtual ~SrsRtcStreamManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps);
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcStream** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
virtual SrsRtcSource* fetch(SrsRequest* r);
virtual SrsRtcStream* fetch(SrsRequest* r);
};
// Global singleton instance.
extern SrsRtcSourceManager* _srs_rtc_sources;
extern SrsRtcStreamManager* _srs_rtc_sources;
class ISrsRtcPublisher
// A publish stream interface, for source to callback with.
class ISrsRtcPublishStream
{
public:
ISrsRtcPublisher();
virtual ~ISrsRtcPublisher();
ISrsRtcPublishStream();
virtual ~ISrsRtcPublishStream();
public:
virtual void request_keyframe() = 0;
};
class SrsRtcSource
// A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream.
class SrsRtcStream
{
private:
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
std::string _source_id;
SrsContextId _source_id;
// previous source id.
std::string _pre_source_id;
SrsContextId _pre_source_id;
SrsRequest* req;
ISrsRtcPublisher* rtc_publisher_;
ISrsRtcPublishStream* publish_stream_;
// Transmux RTMP to RTC.
ISrsSourceBridger* bridger_;
private:
@ -122,17 +124,17 @@ private:
// Whether source is avaiable for publishing.
bool _can_publish;
public:
SrsRtcSource();
virtual ~SrsRtcSource();
SrsRtcStream();
virtual ~SrsRtcStream();
public:
virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
// The source id changed.
virtual srs_error_t on_source_id_changed(std::string id);
virtual srs_error_t on_source_id_changed(SrsContextId id);
// Get current source id.
virtual std::string source_id();
virtual std::string pre_source_id();
virtual SrsContextId source_id();
virtual SrsContextId pre_source_id();
// Get the bridger.
ISrsSourceBridger* bridger();
public:
@ -153,8 +155,8 @@ public:
virtual void on_unpublish();
public:
// Get and set the publisher, passed to consumer to process requests such as PLI.
ISrsRtcPublisher* rtc_publisher();
void set_rtc_publisher(ISrsRtcPublisher* v);
ISrsRtcPublishStream* publish_stream();
void set_publish_stream(ISrsRtcPublishStream* v);
// Consume the shared RTP packet, user must free it.
srs_error_t on_rtp(SrsRtpPacket2* pkt);
};
@ -164,7 +166,7 @@ class SrsRtcFromRtmpBridger : public ISrsSourceBridger
{
private:
SrsRequest* req;
SrsRtcSource* source_;
SrsRtcStream* source_;
// The format, codec information.
SrsRtmpFormat* format;
// The metadata cache.
@ -178,7 +180,7 @@ private:
uint16_t audio_sequence;
uint16_t video_sequence;
public:
SrsRtcFromRtmpBridger(SrsRtcSource* source);
SrsRtcFromRtmpBridger(SrsRtcStream* source);
virtual ~SrsRtcFromRtmpBridger();
public:
virtual srs_error_t initialize(SrsRequest* r);
@ -192,7 +194,7 @@ public:
virtual srs_error_t on_video(SrsSharedPtrMessage* msg);
private:
srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, std::vector<SrsSample*>& samples);
srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt);
srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector<SrsSample*>& samples, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacket2*>& pkts);

View file

@ -816,9 +816,6 @@ srs_error_t SrsServer::initialize_st()
_srs_config->get_max_connections(), __MMAP_MAX_CONNECTIONS);
}
// set current log id.
_srs_context->generate_id();
// check asprocess.
bool asprocess = _srs_config->get_asprocess();
if (asprocess && ppid == 1) {

View file

@ -1728,7 +1728,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost);
// Get the RTC source and bridger.
SrsRtcSource* rtc = NULL;
SrsRtcStream* rtc = NULL;
if (rtc_server_enabled && rtc_enabled) {
if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) {
err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str());
@ -1792,7 +1792,7 @@ void SrsSourceManager::dispose()
srs_error_t SrsSourceManager::cycle()
{
std::string cid = _srs_context->get_id();
SrsContextId cid = _srs_context->get_id();
srs_error_t err = do_cycle();
_srs_context->set_id(cid);
@ -1866,7 +1866,6 @@ SrsSource::SrsSource()
mix_queue = new SrsMixQueue();
_can_publish = true;
_pre_source_id = _source_id = "";
die_at = 0;
handler = NULL;
@ -2065,17 +2064,17 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost)
return err;
}
srs_error_t SrsSource::on_source_id_changed(string id)
srs_error_t SrsSource::on_source_id_changed(SrsContextId id)
{
srs_error_t err = srs_success;
if (_source_id == id) {
if (!_source_id.compare(id)) {
return err;
}
if (_pre_source_id == "") {
if (_pre_source_id.empty()) {
_pre_source_id = id;
} else if (_pre_source_id != _source_id) {
} else if (_pre_source_id.compare(_source_id)) {
_pre_source_id = _source_id;
}
@ -2091,12 +2090,12 @@ srs_error_t SrsSource::on_source_id_changed(string id)
return err;
}
string SrsSource::source_id()
SrsContextId SrsSource::source_id()
{
return _source_id;
}
string SrsSource::pre_source_id()
SrsContextId SrsSource::pre_source_id()
{
return _pre_source_id;
}
@ -2560,7 +2559,7 @@ void SrsSource::on_unpublish()
srs_trace("cleanup when unpublish");
_can_publish = true;
_source_id = -1;
_source_id = SrsContextId();
// notify the handler.
srs_assert(handler);

View file

@ -508,9 +508,9 @@ private:
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
std::string _source_id;
SrsContextId _source_id;
// previous source id.
std::string _pre_source_id;
SrsContextId _pre_source_id;
// deep copy of client request.
SrsRequest* req;
// To delivery stream to clients.
@ -567,10 +567,10 @@ public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
public:
// The source id changed.
virtual srs_error_t on_source_id_changed(std::string id);
virtual srs_error_t on_source_id_changed(SrsContextId id);
// Get current source id.
virtual std::string source_id();
virtual std::string pre_source_id();
virtual SrsContextId source_id();
virtual SrsContextId pre_source_id();
// Whether source is inactive, which means there is no publishing stream source.
// @remark For edge, it's inactive util stream has been pulled from origin.
virtual bool inactive();

View file

@ -74,18 +74,27 @@ srs_error_t SrsDummyCoroutine::pull()
return srs_error_new(ERROR_THREAD_DUMMY, "dummy pull");
}
string SrsDummyCoroutine::cid()
SrsContextId SrsDummyCoroutine::cid()
{
return "";
return SrsContextId();
}
_ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create;
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, std::string cid)
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h)
{
name = n;
handler = h;
context = cid;
trd = NULL;
trd_err = srs_success;
started = interrupted = disposed = cycle_done = false;
}
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{
name = n;
handler = h;
cid_ = cid;
trd = NULL;
trd_err = srs_success;
started = interrupted = disposed = cycle_done = false;
@ -180,19 +189,18 @@ srs_error_t SrsSTCoroutine::pull()
return srs_error_copy(trd_err);
}
string SrsSTCoroutine::cid()
SrsContextId SrsSTCoroutine::cid()
{
return context;
return cid_;
}
srs_error_t SrsSTCoroutine::cycle()
{
if (_srs_context) {
if (!context.empty()) {
_srs_context->set_id(context);
} else {
context = _srs_context->generate_id();
if (cid_.empty()) {
cid_ = _srs_context->generate_id();
}
_srs_context->set_id(cid_);
}
srs_error_t err = handler->cycle();

View file

@ -28,6 +28,7 @@
#include <string>
#include <srs_kernel_log.hpp>
#include <srs_service_st.hpp>
#include <srs_protocol_io.hpp>
@ -80,7 +81,7 @@ public:
// @return a copy of error, which should be freed by user.
// NULL if not terminated and user should pull again.
virtual srs_error_t pull() = 0;
virtual std::string cid() = 0;
virtual SrsContextId cid() = 0;
};
// An empty coroutine, user can default to this object before create any real coroutine.
@ -95,7 +96,7 @@ public:
virtual void stop();
virtual void interrupt();
virtual srs_error_t pull();
virtual std::string cid();
virtual SrsContextId cid();
};
// For utest to mock the thread create.
@ -121,7 +122,7 @@ private:
ISrsCoroutineHandler* handler;
private:
srs_thread_t trd;
std::string context;
SrsContextId cid_;
srs_error_t trd_err;
private:
bool started;
@ -132,7 +133,8 @@ private:
public:
// Create a thread with name n and handler h.
// @remark User can specify a cid for thread to use, or we will allocate a new one.
SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h, std::string cid = "");
SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h);
SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid);
virtual ~SrsSTCoroutine();
public:
// Start the thread.
@ -154,7 +156,7 @@ public:
// @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted.
virtual srs_error_t pull();
// Get the context id of thread.
virtual std::string cid();
virtual SrsContextId cid();
private:
virtual srs_error_t cycle();
static void* pfn(void* arg);

View file

@ -101,8 +101,7 @@ SrsStatisticStream::SrsStatisticStream()
id = srs_generate_id();
vhost = NULL;
active = false;
connection_cid = -1;
has_video = false;
vcodec = SrsVideoCodecIdReserved;
avc_profile = SrsAvcProfileReserved;
@ -184,7 +183,7 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)
return err;
}
void SrsStatisticStream::publish(string cid)
void SrsStatisticStream::publish(SrsContextId cid)
{
connection_cid = cid;
active = true;
@ -337,10 +336,10 @@ SrsStatisticStream* SrsStatistic::find_stream(string sid)
return NULL;
}
SrsStatisticClient* SrsStatistic::find_client(string cid)
SrsStatisticClient* SrsStatistic::find_client(string client_id)
{
std::map<std::string, SrsStatisticClient*>::iterator it;
if ((it = clients.find(cid)) != clients.end()) {
if ((it = clients.find(client_id)) != clients.end()) {
return it->second;
}
return NULL;
@ -392,7 +391,7 @@ srs_error_t SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames)
return err;
}
void SrsStatistic::on_stream_publish(SrsRequest* req, string cid)
void SrsStatistic::on_stream_publish(SrsRequest* req, SrsContextId cid)
{
SrsStatisticVhost* vhost = create_vhost(req);
SrsStatisticStream* stream = create_stream(vhost, req);
@ -423,10 +422,13 @@ void SrsStatistic::on_stream_close(SrsRequest* req)
}
}
srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type)
srs_error_t SrsStatistic::on_client(SrsContextId cid, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type)
{
srs_error_t err = srs_success;
// TODO: FIXME: We should use UUID for client ID.
std::string id = cid.c_str();
SrsStatisticVhost* vhost = create_vhost(req);
SrsStatisticStream* stream = create_stream(vhost, req);
@ -451,8 +453,11 @@ srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, SrsConnecti
return err;
}
void SrsStatistic::on_disconnect(std::string id)
void SrsStatistic::on_disconnect(SrsContextId cid)
{
// TODO: FIXME: We should use UUID for client ID.
std::string id = cid.c_str();
std::map<std::string, SrsStatisticClient*>::iterator it;
if ((it = clients.find(id)) == clients.end()) {
return;
@ -471,7 +476,8 @@ void SrsStatistic::on_disconnect(std::string id)
void SrsStatistic::kbps_add_delta(SrsConnection* conn)
{
std::string id = conn->srs_id();
// TODO: FIXME: Should not use context id as connection id.
std::string id = conn->srs_id().c_str();
if (clients.find(id) == clients.end()) {
return;
}

View file

@ -67,7 +67,7 @@ public:
std::string stream;
std::string url;
bool active;
std::string connection_cid;
SrsContextId connection_cid;
int nb_clients;
uint64_t nb_frames;
public:
@ -101,7 +101,7 @@ public:
virtual srs_error_t dumps(SrsJsonObject* obj);
public:
// Publish the stream.
virtual void publish(std::string cid);
virtual void publish(SrsContextId cid);
// Close the stream.
virtual void close();
};
@ -181,7 +181,7 @@ public:
public:
virtual SrsStatisticVhost* find_vhost(std::string vid);
virtual SrsStatisticStream* find_stream(std::string sid);
virtual SrsStatisticClient* find_client(std::string cid);
virtual SrsStatisticClient* find_client(std::string client_id);
public:
// When got video info for stream.
virtual srs_error_t on_video_info(SrsRequest* req, SrsVideoCodecId vcodec, SrsAvcProfile avc_profile,
@ -195,7 +195,7 @@ public:
// When publish stream.
// @param req the request object of publish connection.
// @param cid the cid of publish connection.
virtual void on_stream_publish(SrsRequest* req, std::string cid);
virtual void on_stream_publish(SrsRequest* req, SrsContextId cid);
// When close stream.
virtual void on_stream_close(SrsRequest* req);
public:
@ -204,12 +204,14 @@ public:
// @param req, the client request object.
// @param conn, the physical absract connection object.
// @param type, the type of connection.
virtual srs_error_t on_client(std::string id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type);
// TODO: FIXME: We should not use context id as client id.
virtual srs_error_t on_client(SrsContextId id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type);
// Client disconnect
// @remark the on_disconnect always call, while the on_client is call when
// only got the request object, so the client specified by id maybe not
// exists in stat.
virtual void on_disconnect(std::string id);
// TODO: FIXME: We should not use context id as client id.
virtual void on_disconnect(SrsContextId id);
// Sample the kbps, add delta bytes of conn.
// Use kbps_sample() to get all result of kbps stat.
// TODO: FIXME: the add delta must use ISrsKbpsDelta interface instead.

View file

@ -23,4 +23,42 @@
#include <srs_core.hpp>
_SrsContextId::_SrsContextId()
{
}
_SrsContextId::_SrsContextId(std::string v)
{
v_ = v;
}
_SrsContextId::_SrsContextId(const _SrsContextId& cp)
{
v_ = cp.v_;
}
_SrsContextId& _SrsContextId::operator=(const _SrsContextId& cp)
{
v_ = cp.v_;
return *this;
}
_SrsContextId::~_SrsContextId()
{
}
const char* _SrsContextId::c_str() const
{
return v_.c_str();
}
bool _SrsContextId::empty() const
{
return v_.empty();
}
int _SrsContextId::compare(const _SrsContextId& to) const
{
return v_.compare(to.v_);
}

View file

@ -113,4 +113,33 @@
class SrsCplxError;
typedef SrsCplxError* srs_error_t;
#include <string>
// The context ID, it default to a string object, we can also use other objects.
// @remark User can directly user string as SrsContextId, we user struct to ensure the context is an object.
#if 1
class _SrsContextId
{
private:
std::string v_;
public:
_SrsContextId();
_SrsContextId(std::string v);
_SrsContextId(const _SrsContextId& cp);
_SrsContextId& operator=(const _SrsContextId& cp);
virtual ~_SrsContextId();
public:
const char* c_str() const;
bool empty() const;
// Compare the two context id. @see http://www.cplusplus.com/reference/string/string/compare/
// 0 They compare equal
// <0 Either the value of the first character that does not match is lower in the compared string, or all compared characters match but the compared string is shorter.
// >0 Either the value of the first character that does not match is greater in the compared string, or all compared characters match but the compared string is longer.
int compare(const _SrsContextId& to) const;
};
typedef _SrsContextId SrsContextId;
#else
// Actually, we can directly user string as SrsContextId.
typedef std::string SrsContextId;
#endif
#endif

View file

@ -57,7 +57,7 @@ SrsCplxError::SrsCplxError()
{
code = ERROR_SUCCESS;
wrapped = NULL;
cid = rerrno = line = 0;
rerrno = line = 0;
}
SrsCplxError::~SrsCplxError()
@ -79,7 +79,7 @@ std::string SrsCplxError::description() {
next = this;
while (next) {
ss << "thread [" << getpid() << "][" << next->cid << "]: "
ss << "thread [" << getpid() << "][" << next->cid.c_str() << "]: "
<< next->func << "() [" << next->file << ":" << next->line << "]"
<< "[errno=" << next->rerrno << "]";

View file

@ -414,7 +414,7 @@ private:
std::string file;
int line;
std::string cid;
SrsContextId cid;
int rerrno;
std::string desc;

View file

@ -65,37 +65,38 @@ public:
virtual void reopen() = 0;
public:
// The log for verbose, very verbose information.
virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...) = 0;
virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0;
// The log for debug, detail information.
virtual void info(const char* tag, const char* context_id, const char* fmt, ...) = 0;
virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0;
// The log for trace, important information.
virtual void trace(const char* tag, const char* context_id, const char* fmt, ...) = 0;
virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0;
// The log for warn, warn is something should take attention, but not a error.
virtual void warn(const char* tag, const char* context_id, const char* fmt, ...) = 0;
virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0;
// The log for error, something error occur, do something about the error, ie. close the connection,
// but we will donot abort the program.
virtual void error(const char* tag, const char* context_id, const char* fmt, ...) = 0;
virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0;
};
// The logic context for a RTMP connection, or RTC Session.
// The logic context, for example, a RTMP connection, or RTC Session, etc.
// We can grep the context id to identify the logic unit, for debugging.
// For example:
// _srs_context->generate_id(); // Generate a new context.
// _srs_context->get_id(); // Get current context.
// int old_id = _srs_context->set_id("1000"); // Change the context.
// SrsContextId cid = _srs_context->get_id(); // Get current context id.
// SrsContextId new_cid = _srs_context->generate_id(); // Generate a new context id.
// SrsContextId old_cid = _srs_context->set_id(new_cid); // Change the context id.
class ISrsContext
{
public:
ISrsContext();
virtual ~ISrsContext();
public:
// Generate the id for current context.
virtual std::string generate_id() = 0;
// Get the generated id of current context.
virtual std::string get_id() = 0;
// Set the id of current context.
// @return the previous id value; 0 if no context.
virtual std::string set_id(std::string v) = 0;
// Generate a new context id.
// @remark We do not set to current thread, user should do this.
virtual SrsContextId generate_id() = 0;
// Get the context id of current thread.
virtual SrsContextId get_id() = 0;
// Set the context id of current thread.
// @return the previous context id.
virtual SrsContextId set_id(SrsContextId v) = 0;
};
// @global User must provides a log object
@ -108,25 +109,25 @@ extern ISrsContext* _srs_context;
// Use __FUNCTION__ to print c method
// Use __PRETTY_FUNCTION__ to print c++ class:method
#if 1
#define srs_verbose(msg, ...) _srs_log->verbose(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_info(msg, ...) _srs_log->info(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_trace(msg, ...) _srs_log->trace(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_warn(msg, ...) _srs_log->warn(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_error(msg, ...) _srs_log->error(NULL, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_verbose(msg, ...) _srs_log->verbose(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_info(msg, ...) _srs_log->info(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_trace(msg, ...) _srs_log->trace(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_warn(msg, ...) _srs_log->warn(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_error(msg, ...) _srs_log->error(NULL, _srs_context->get_id(), msg, ##__VA_ARGS__)
#endif
#if 0
#define srs_verbose(msg, ...) _srs_log->verbose(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_info(msg, ...) _srs_log->info(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_trace(msg, ...) _srs_log->trace(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_warn(msg, ...) _srs_log->warn(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_error(msg, ...) _srs_log->error(__FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_verbose(msg, ...) _srs_log->verbose(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_info(msg, ...) _srs_log->info(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_trace(msg, ...) _srs_log->trace(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_warn(msg, ...) _srs_log->warn(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_error(msg, ...) _srs_log->error(__FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#endif
#if 0
#define srs_verbose(msg, ...) _srs_log->verbose(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_info(msg, ...) _srs_log->info(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_trace(msg, ...) _srs_log->trace(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_warn(msg, ...) _srs_log->warn(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_error(msg, ...) _srs_log->error(__PRETTY_FUNCTION__, _srs_context->get_id().c_str(), msg, ##__VA_ARGS__)
#define srs_verbose(msg, ...) _srs_log->verbose(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_info(msg, ...) _srs_log->info(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_trace(msg, ...) _srs_log->trace(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_warn(msg, ...) _srs_log->warn(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#define srs_error(msg, ...) _srs_log->error(__PRETTY_FUNCTION__, _srs_context->get_id(), msg, ##__VA_ARGS__)
#endif
// TODO: FIXME: Add more verbose and info logs.

View file

@ -213,7 +213,7 @@ srs_error_t do_main(int argc, char** argv)
int main(int argc, char** argv) {
// For background context id.
_srs_context->generate_id();
_srs_context->set_id(_srs_context->generate_id());
srs_error_t err = do_main(argc, argv);

View file

@ -148,20 +148,39 @@ void srs_parse_query_string(string q, map<string,string>& query)
}
}
static bool _random_initialized = false;
void srs_random_generate(char* bytes, int size)
{
static bool _random_initialized = false;
if (!_random_initialized) {
srand(0);
_random_initialized = true;
::srandom((unsigned long)(srs_update_system_time() | (::getpid()<<13)));
}
for (int i = 0; i < size; i++) {
// the common value in [0x0f, 0xf0]
bytes[i] = 0x0f + (rand() % (256 - 0x0f - 0x0f));
bytes[i] = 0x0f + (random() % (256 - 0x0f - 0x0f));
}
}
std::string srs_random_str(int len)
{
if (!_random_initialized) {
_random_initialized = true;
::srandom((unsigned long)(srs_update_system_time() | (::getpid()<<13)));
}
static string random_table = "01234567890123456789012345678901234567890123456789abcdefghijklmnopqrstuvwxyz";
string ret;
ret.reserve(len);
for (int i = 0; i < len; ++i) {
ret.append(1, random_table[random() % random_table.size()]);
}
return ret;
}
string srs_generate_tc_url(string host, string vhost, string app, int port)
{
string tcUrl = "rtmp://";

View file

@ -65,11 +65,12 @@ extern void srs_discovery_tc_url(std::string tcUrl, std::string& schema, std::st
// must format as key=value&...&keyN=valueN
extern void srs_parse_query_string(std::string q, std::map<std::string, std::string>& query);
/**
* generate ramdom data for handshake.
*/
// Generate ramdom data for handshake.
extern void srs_random_generate(char* bytes, int size);
// Generate random string [0-9a-z] in size of len bytes.
extern std::string srs_random_str(int len);
/**
* generate the tcUrl without param.
* @remark Use host as tcUrl.vhost if vhost is default vhost.

View file

@ -31,6 +31,7 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_utility.hpp>
#define SRS_BASIC_LOG_SIZE 1024
@ -42,31 +43,22 @@ SrsThreadContext::~SrsThreadContext()
{
}
string SrsThreadContext::generate_id()
SrsContextId SrsThreadContext::generate_id()
{
static int id = 0;
if (id == 0) {
id = (100 + ((uint32_t)(int64_t)this)%1000);
}
int gid = id++;
stringstream ss;
ss << gid;
cache[srs_thread_self()] = ss.str();
return ss.str();
SrsContextId cid = SrsContextId(srs_random_str(8));
return cid;
}
string SrsThreadContext::get_id()
SrsContextId SrsThreadContext::get_id()
{
return cache[srs_thread_self()];
}
string SrsThreadContext::set_id(string v)
SrsContextId SrsThreadContext::set_id(SrsContextId v)
{
srs_thread_t self = srs_thread_self();
string ov;
SrsContextId ov;
if (cache.find(self) != cache.end()) {
ov = cache[self];
}
@ -79,7 +71,7 @@ string SrsThreadContext::set_id(string v)
void SrsThreadContext::clear_cid()
{
srs_thread_t self = srs_thread_self();
std::map<srs_thread_t, string>::iterator it = cache.find(self);
std::map<srs_thread_t, SrsContextId>::iterator it = cache.find(self);
if (it != cache.end()) {
cache.erase(it);
}
@ -108,7 +100,7 @@ void SrsConsoleLog::reopen()
{
}
void SrsConsoleLog::verbose(const char* tag, const char* context_id, const char* fmt, ...)
void SrsConsoleLog::verbose(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelVerbose) {
return;
@ -128,7 +120,7 @@ void SrsConsoleLog::verbose(const char* tag, const char* context_id, const char*
fprintf(stdout, "%s\n", buffer);
}
void SrsConsoleLog::info(const char* tag, const char* context_id, const char* fmt, ...)
void SrsConsoleLog::info(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelInfo) {
return;
@ -148,7 +140,7 @@ void SrsConsoleLog::info(const char* tag, const char* context_id, const char* fm
fprintf(stdout, "%s\n", buffer);
}
void SrsConsoleLog::trace(const char* tag, const char* context_id, const char* fmt, ...)
void SrsConsoleLog::trace(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelTrace) {
return;
@ -168,7 +160,7 @@ void SrsConsoleLog::trace(const char* tag, const char* context_id, const char* f
fprintf(stdout, "%s\n", buffer);
}
void SrsConsoleLog::warn(const char* tag, const char* context_id, const char* fmt, ...)
void SrsConsoleLog::warn(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelWarn) {
return;
@ -188,7 +180,7 @@ void SrsConsoleLog::warn(const char* tag, const char* context_id, const char* fm
fprintf(stderr, "%s\n", buffer);
}
void SrsConsoleLog::error(const char* tag, const char* context_id, const char* fmt, ...)
void SrsConsoleLog::error(const char* tag, SrsContextId context_id, const char* fmt, ...)
{
if (level > SrsLogLevelError) {
return;
@ -214,7 +206,7 @@ void SrsConsoleLog::error(const char* tag, const char* context_id, const char* f
}
// LCOV_EXCL_STOP
bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, const char* cid, const char* level, int* psize)
bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, SrsContextId cid, const char* level, int* psize)
{
// clock time
timeval tv;
@ -240,24 +232,24 @@ bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%s][%d][%s][%d] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, tag, getpid(), cid, errno);
level, tag, getpid(), cid.c_str(), errno);
} else {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%d][%s][%d] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, getpid(), cid, errno);
level, getpid(), cid.c_str(), errno);
}
} else {
if (tag) {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%s][%d][%s] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, tag, getpid(), cid);
level, tag, getpid(), cid.c_str());
} else {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%d][%s] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, getpid(), cid);
level, getpid(), cid.c_str());
}
}

View file

@ -37,14 +37,14 @@
class SrsThreadContext : public ISrsContext
{
private:
std::map<srs_thread_t, std::string> cache;
std::map<srs_thread_t, SrsContextId> cache;
public:
SrsThreadContext();
virtual ~SrsThreadContext();
public:
virtual std::string generate_id();
virtual std::string get_id();
virtual std::string set_id(std::string v);
virtual SrsContextId generate_id();
virtual SrsContextId get_id();
virtual SrsContextId set_id(SrsContextId v);
public:
virtual void clear_cid();
};
@ -64,11 +64,11 @@ public:
public:
virtual srs_error_t initialize();
virtual void reopen();
virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...);
virtual void info(const char* tag, const char* context_id, const char* fmt, ...);
virtual void trace(const char* tag, const char* context_id, const char* fmt, ...);
virtual void warn(const char* tag, const char* context_id, const char* fmt, ...);
virtual void error(const char* tag, const char* context_id, const char* fmt, ...);
virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...);
virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...);
};
// Generate the log header.
@ -76,6 +76,6 @@ public:
// @param utc Whether use UTC time format in the log header.
// @param psize Output the actual header size.
// @remark It's a internal API.
bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, const char* cid, const char* level, int* psize);
bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, SrsContextId cid, const char* level, int* psize);
#endif

View file

@ -70,8 +70,8 @@ srs_error_t srs_st_init()
return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name());
}
// Before ST init, we might have already inited the background cid.
string cid = _srs_context->get_id();
// Before ST init, we might have already initialized the background cid.
SrsContextId cid = _srs_context->get_id();
if (cid.empty()) {
cid = _srs_context->generate_id();
}

View file

@ -137,3 +137,47 @@ VOID TEST(SampleTest, StringEQTest)
EXPECT_STREQ("100", str.c_str());
}
class MockSrsContextId
{
public:
MockSrsContextId() {
bind_ = NULL;
}
MockSrsContextId(const MockSrsContextId& cp){
bind_ = NULL;
if (cp.bind_) {
bind_ = cp.bind_->copy();
}
}
MockSrsContextId& operator= (const MockSrsContextId& cp) {
srs_freep(bind_);
if (cp.bind_) {
bind_ = cp.bind_->copy();
}
return *this;
}
virtual ~MockSrsContextId() {
srs_freep(bind_);
}
public:
MockSrsContextId* copy() const {
MockSrsContextId* cp = new MockSrsContextId();
if (bind_) {
cp->bind_ = bind_->copy();
}
return cp;
}
private:
MockSrsContextId* bind_;
};
VOID TEST(SampleTest, ContextTest)
{
MockSrsContextId cid;
cid.bind_ = new MockSrsContextId();
static std::map<int, MockSrsContextId> cache;
cache[0] = cid;
cache[0] = cid;
}

View file

@ -36,7 +36,7 @@ VOID TEST(AppCoroutineTest, Dummy)
SrsDummyCoroutine dc;
if (true) {
EXPECT_EQ("", dc.cid());
EXPECT_TRUE(dc.cid().empty());
srs_error_t err = dc.pull();
EXPECT_TRUE(err != srs_success);
@ -52,7 +52,7 @@ VOID TEST(AppCoroutineTest, Dummy)
if (true) {
dc.stop();
EXPECT_EQ("", dc.cid());
EXPECT_TRUE(dc.cid().empty());
srs_error_t err = dc.pull();
EXPECT_TRUE(err != srs_success);
@ -68,7 +68,7 @@ VOID TEST(AppCoroutineTest, Dummy)
if (true) {
dc.interrupt();
EXPECT_EQ("", dc.cid());
EXPECT_TRUE(dc.cid().empty());
srs_error_t err = dc.pull();
EXPECT_TRUE(err != srs_success);
@ -88,7 +88,7 @@ public:
srs_error_t err;
srs_cond_t running;
srs_cond_t exited;
std::string cid;
SrsContextId cid;
// Quit without error.
bool quit;
public:
@ -128,12 +128,12 @@ VOID TEST(AppCoroutineTest, StartStop)
MockCoroutineHandler ch;
SrsSTCoroutine sc("test", &ch);
ch.trd = &sc;
EXPECT_EQ("", sc.cid());
EXPECT_TRUE(sc.cid().empty());
// Thread stop after created.
sc.stop();
EXPECT_EQ("", sc.cid());
EXPECT_TRUE(sc.cid().empty());
srs_error_t err = sc.pull();
EXPECT_TRUE(srs_success != err);
@ -151,7 +151,7 @@ VOID TEST(AppCoroutineTest, StartStop)
MockCoroutineHandler ch;
SrsSTCoroutine sc("test", &ch);
ch.trd = &sc;
EXPECT_EQ("", sc.cid());
EXPECT_TRUE(sc.cid().empty());
EXPECT_TRUE(srs_success == sc.start());
EXPECT_TRUE(srs_success == sc.pull());
@ -178,7 +178,7 @@ VOID TEST(AppCoroutineTest, StartStop)
MockCoroutineHandler ch;
SrsSTCoroutine sc("test", &ch);
ch.trd = &sc;
EXPECT_EQ("", sc.cid());
EXPECT_TRUE(sc.cid().empty());
EXPECT_TRUE(srs_success == sc.start());
EXPECT_TRUE(srs_success == sc.pull());
@ -220,16 +220,16 @@ VOID TEST(AppCoroutineTest, Cycle)
if (true) {
MockCoroutineHandler ch;
SrsSTCoroutine sc("test", &ch, "250");
SrsSTCoroutine sc("test", &ch, SrsContextId("250"));
ch.trd = &sc;
EXPECT_TRUE("250" == sc.cid());
EXPECT_TRUE(!sc.cid().compare(SrsContextId("250")));
EXPECT_TRUE(srs_success == sc.start());
EXPECT_TRUE(srs_success == sc.pull());
// After running, the cid in cycle should equal to the thread.
srs_cond_timedwait(ch.running, 100 * SRS_UTIME_MILLISECONDS);
EXPECT_TRUE("250" == ch.cid);
EXPECT_TRUE(!ch.cid.compare(SrsContextId("250")));
}
if (true) {

View file

@ -898,7 +898,7 @@ class MockOnCycleThread : public ISrsCoroutineHandler
public:
SrsSTCoroutine trd;
srs_cond_t cond;
MockOnCycleThread() : trd("mock", this, "0") {
MockOnCycleThread() : trd("mock", this) {
cond = srs_cond_new();
};
virtual ~MockOnCycleThread() {
@ -938,7 +938,7 @@ class MockOnCycleThread2 : public ISrsCoroutineHandler
public:
SrsSTCoroutine trd;
srs_mutex_t lock;
MockOnCycleThread2() : trd("mock", this, "0") {
MockOnCycleThread2() : trd("mock", this) {
lock = srs_mutex_new();
};
virtual ~MockOnCycleThread2() {
@ -979,7 +979,7 @@ class MockOnCycleThread3 : public ISrsCoroutineHandler
public:
SrsSTCoroutine trd;
srs_netfd_t fd;
MockOnCycleThread3() : trd("mock", this, "0") {
MockOnCycleThread3() : trd("mock", this) {
fd = NULL;
};
virtual ~MockOnCycleThread3() {
@ -1230,7 +1230,7 @@ class MockOnCycleThread4 : public ISrsCoroutineHandler
public:
SrsSTCoroutine trd;
srs_netfd_t fd;
MockOnCycleThread4() : trd("mock", this, "0") {
MockOnCycleThread4() : trd("mock", this) {
fd = NULL;
};
virtual ~MockOnCycleThread4() {
@ -1382,19 +1382,19 @@ VOID TEST(TCPServerTest, ContextUtility)
if (true) {
SrsThreadContext ctx;
EXPECT_TRUE("" == ctx.set_id("100"));
EXPECT_TRUE("100" == ctx.set_id("1000"));
EXPECT_TRUE("1000" == ctx.get_id());
EXPECT_TRUE(ctx.set_id(SrsContextId("100")).empty());
EXPECT_TRUE(!ctx.set_id(SrsContextId("1000")).compare(SrsContextId("100")));
EXPECT_TRUE(!ctx.get_id().compare(SrsContextId("1000")));
ctx.clear_cid();
EXPECT_TRUE("" == ctx.set_id("100"));
EXPECT_TRUE(ctx.set_id(SrsContextId("100")).empty());
}
int base_size = 0;
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", "100", "Trace", &size));
ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", SrsContextId("100"), "Trace", &size));
base_size = size;
EXPECT_TRUE(base_size > 0);
}
@ -1402,21 +1402,21 @@ VOID TEST(TCPServerTest, ContextUtility)
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", "100", "Trace", &size));
ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", SrsContextId("100"), "Trace", &size));
EXPECT_EQ(base_size, size);
}
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, false, true, NULL, "100", "Trace", &size));
ASSERT_TRUE(srs_log_header(buf, 1024, false, true, NULL, SrsContextId("100"), "Trace", &size));
EXPECT_EQ(base_size - 5, size);
}
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, "100", "Trace", &size));
ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, SrsContextId("100"), "Trace", &size));
EXPECT_EQ(base_size - 8, size);
}