diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index b28e71a50..869a40a01 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -254,15 +254,13 @@ void SrsQueueRecvThread::on_stop() SrsPublishRecvThread::SrsPublishRecvThread( SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, - SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge + SrsRtmpConn* conn, SrsSource* source ): trd(this, rtmp_sdk, timeout_ms) { rtmp = rtmp_sdk; _conn = conn; _source = source; - _is_fmle = is_fmle; - _is_edge = is_edge; recv_error_code = ERROR_SUCCESS; _nb_msgs = 0; @@ -351,7 +349,7 @@ int SrsPublishRecvThread::consume(SrsCommonMessage* msg) srs_update_system_time_ms(), msg->header.timestamp, msg->size); // the rtmp connection will handle this message - ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge); + ret = _conn->handle_publish_message(_source, msg); // must always free it, // the source will copy it if need to use. diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index b97dbbc83..f268b0ebf 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -181,8 +181,6 @@ private: SrsRtmpConn* _conn; // the params for conn callback. SrsSource* _source; - bool _is_fmle; - bool _is_edge; // the error timeout cond // @see https://github.com/ossrs/srs/issues/244 st_cond_t error; @@ -192,7 +190,7 @@ private: public: SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, - SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge); + SrsRtmpConn* conn, SrsSource* source); virtual ~SrsPublishRecvThread(); public: /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index f01f11efe..b92c2fb7f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -283,13 +283,25 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) transport->set_recv_timeout(timeout); } +SrsClientInfo::SrsClientInfo() +{ + edge = false; + req = new SrsRequest(); + res = new SrsResponse(); + type = SrsRtmpConnUnknown; +} + +SrsClientInfo::~SrsClientInfo() +{ + srs_freep(req); + srs_freep(res); +} + SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) : SrsConnection(svr, c, cip) { server = svr; - req = new SrsRequest(); - res = new SrsResponse(); skt = new SrsStSocket(c); rtmp = new SrsRtmpServer(skt); refer = new SrsRefer(); @@ -305,7 +317,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) realtime = SRS_PERF_MIN_LATENCY_ENABLED; send_min_interval = 0; tcp_nodelay = false; - client_type = SrsRtmpConnUnknown; + info = new SrsClientInfo(); _srs_config->subscribe(this); } @@ -314,8 +326,7 @@ SrsRtmpConn::~SrsRtmpConn() { _srs_config->unsubscribe(this); - srs_freep(req); - srs_freep(res); + srs_freep(info); srs_freep(rtmp); srs_freep(skt); srs_freep(refer); @@ -358,6 +369,7 @@ int SrsRtmpConn::do_cycle() } srs_verbose("rtmp handshake success"); + SrsRequest* req = info->req; if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) { srs_error("rtmp connect vhost/app failed. ret=%d", ret); return ret; @@ -440,6 +452,8 @@ int SrsRtmpConn::on_reload_vhost_removed(string vhost) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + if (req->vhost != vhost) { return ret; } @@ -460,6 +474,8 @@ int SrsRtmpConn::on_reload_vhost_play(string vhost) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + if (req->vhost != vhost) { return ret; } @@ -480,6 +496,8 @@ int SrsRtmpConn::on_reload_vhost_tcp_nodelay(string vhost) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + if (req->vhost != vhost) { return ret; } @@ -493,6 +511,8 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + if (req->vhost != vhost) { return ret; } @@ -510,6 +530,8 @@ int SrsRtmpConn::on_reload_vhost_publish(string vhost) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + if (req->vhost != vhost) { return ret; } @@ -553,6 +575,8 @@ int SrsRtmpConn::service_cycle() { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + int out_ack_size = _srs_config->get_out_ack_size(req->vhost); if (out_ack_size && (ret = rtmp->set_window_ack_size(out_ack_size)) != ERROR_SUCCESS) { srs_error("set output window acknowledgement size failed. ret=%d", ret); @@ -582,9 +606,9 @@ int SrsRtmpConn::service_cycle() // do token traverse before serve it. // @see https://github.com/ossrs/srs/pull/239 if (true) { - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + info->edge = _srs_config->get_vhost_is_edge(req->vhost); bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost); - if (vhost_is_edge && edge_traverse) { + if (info->edge && edge_traverse) { if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) { srs_warn("token auth failed, ret=%d", ret); return ret; @@ -666,9 +690,10 @@ int SrsRtmpConn::service_cycle() int SrsRtmpConn::stream_service_cycle() { int ret = ERROR_SUCCESS; - - SrsRtmpConnType type; - if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) { + + SrsRequest* req = info->req; + + if ((ret = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("identify client failed. ret=%d", ret); } @@ -676,10 +701,10 @@ int SrsRtmpConn::stream_service_cycle() } req->strip(); srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", - srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration); + srs_client_type_string(info->type).c_str(), req->stream.c_str(), req->duration); // security check - if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) { + if ((ret = security->check(info->type, ip, req)) != ERROR_SUCCESS) { srs_error("security check failed. ret=%d", ret); return ret; } @@ -698,25 +723,23 @@ int SrsRtmpConn::stream_service_cycle() // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); - if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) { + if ((ret = stat->on_client(_srs_context->get_id(), req, this, info->type)) != ERROR_SUCCESS) { srs_error("stat client failed. ret=%d", ret); return ret; } - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", - req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, + req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id(), source->source_id()); source->set_cache(enabled_cache); - client_type = type; - switch (type) { + switch (info->type) { case SrsRtmpConnPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); // response connection start play - if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { + if ((ret = rtmp->start_play(info->res->stream_id)) != ERROR_SUCCESS) { srs_error("start to play stream failed. ret=%d", ret); return ret; } @@ -734,7 +757,7 @@ int SrsRtmpConn::stream_service_cycle() case SrsRtmpConnFMLEPublish: { srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); - if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) { + if ((ret = rtmp->start_fmle_publish(info->res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } @@ -744,7 +767,7 @@ int SrsRtmpConn::stream_service_cycle() case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); - if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) { + if ((ret = rtmp->start_flash_publish(info->res->stream_id)) != ERROR_SUCCESS) { srs_error("flash start to publish stream failed. ret=%d", ret); return ret; } @@ -765,6 +788,7 @@ int SrsRtmpConn::check_vhost(bool try_default_vhost) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; srs_assert(req != NULL); SrsConfDirective* vhost = _srs_config->get_vhost(req->vhost, try_default_vhost); @@ -845,6 +869,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe srs_assert(consumer != NULL); + SrsRequest* req = info->req; if (_srs_config->get_refer_enabled(req->vhost)) { if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) { srs_error("check play_refer failed. ret=%d", ret); @@ -991,7 +1016,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe // sendout messages, all messages are freed by send_and_free_messages(). // no need to assert msg, for the rtmp will assert it. - if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) { + if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("send messages to client failed. ret=%d", ret); } @@ -1021,6 +1046,8 @@ int SrsRtmpConn::publishing(SrsSource* source) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + if (_srs_config->get_refer_enabled(req->vhost)) { if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { srs_error("check publish_refer failed. ret=%d", ret); @@ -1034,14 +1061,10 @@ int SrsRtmpConn::publishing(SrsSource* source) return ret; } - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) { + if ((ret = acquire_publish(source)) == ERROR_SUCCESS) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 - SrsPublishRecvThread trd(rtmp, req, - st_netfd_fileno(stfd), 0, this, source, - client_type == SrsRtmpConnFMLEPublish, - vhost_is_edge); + SrsPublishRecvThread trd(rtmp, req, st_netfd_fileno(stfd), 0, this, source); srs_info("start to publish stream %s success", req->stream.c_str()); ret = do_publishing(source, &trd); @@ -1056,7 +1079,7 @@ int SrsRtmpConn::publishing(SrsSource* source) // @see https://github.com/ossrs/srs/issues/474 // @remark when stream is busy, should never release it. if (ret != ERROR_SYSTEM_STREAM_BUSY) { - release_publish(source, vhost_is_edge); + release_publish(source); } http_hooks_on_unpublish(); @@ -1068,6 +1091,7 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); SrsAutoFree(SrsPithyPrint, pprint); @@ -1150,11 +1174,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) return ret; } -int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) +int SrsRtmpConn::acquire_publish(SrsSource* source) { int ret = ERROR_SUCCESS; - - if (!source->can_publish(is_edge)) { + + SrsRequest* req = info->req; + + if (!source->can_publish(info->edge)) { ret = ERROR_SYSTEM_STREAM_BUSY; srs_warn("stream %s is already publishing. ret=%d", req->get_stream_url().c_str(), ret); @@ -1162,7 +1188,7 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) } // when edge, ignore the publish event, directly proxy it. - if (is_edge) { + if (info->edge) { if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { srs_error("notice edge start publish stream failed. ret=%d", ret); return ret; @@ -1177,18 +1203,18 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) return ret; } -void SrsRtmpConn::release_publish(SrsSource* source, bool is_edge) +void SrsRtmpConn::release_publish(SrsSource* source) { // when edge, notice edge to change state. // when origin, notice all service to unpublish. - if (is_edge) { + if (info->edge) { source->on_edge_proxy_unpublish(); } else { source->on_unpublish(); } } -int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge) +int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; @@ -1202,7 +1228,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg SrsAutoFree(SrsPacket, pkt); // for flash, any packet is republish. - if (!is_fmle) { + if (info->type == SrsRtmpConnFlashPublish) { // flash unpublish. // TODO: maybe need to support republish. srs_trace("flash flash publish finished."); @@ -1212,7 +1238,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg // for fmle, drop others except the fmle start packet. if (dynamic_cast(pkt)) { SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); - if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { + if ((ret = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { return ret; } return ERROR_CONTROL_REPUBLISH; @@ -1223,7 +1249,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg } // video, audio, data message - if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { + if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) { srs_error("fmle process publish message failed. ret=%d", ret); return ret; } @@ -1231,12 +1257,12 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg return ret; } -int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge) +int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; // for edge, directly proxy message to origin. - if (vhost_is_edge) { + if (info->edge) { if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) { srs_error("edge publish proxy msg failed. ret=%d", ret); return ret; @@ -1354,7 +1380,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag // pause SrsPausePacket* pause = dynamic_cast(pkt); if (pause) { - if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) { + if ((ret = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != ERROR_SUCCESS) { srs_error("rtmp process play client pause failed. ret=%d", ret); return ret; } @@ -1425,6 +1451,8 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) void SrsRtmpConn::set_sock_options() { + SrsRequest* req = info->req; + bool nvalue = _srs_config->get_tcp_nodelay(req->vhost); if (nvalue != tcp_nodelay) { tcp_nodelay = nvalue; @@ -1454,6 +1482,7 @@ int SrsRtmpConn::check_edge_token_traverse_auth() { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; srs_assert(req); vector args = _srs_config->get_vhost_edge_origin(req->vhost)->args; @@ -1492,6 +1521,7 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client) { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; srs_assert(client); client->set_recv_timeout(SRS_CONSTS_RTMP_TMMS); @@ -1535,6 +1565,8 @@ int SrsRtmpConn::http_hooks_on_connect() { int ret = ERROR_SUCCESS; + SrsRequest* req = info->req; + #ifdef SRS_AUTO_HTTP_CALLBACK if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { return ret; @@ -1571,6 +1603,8 @@ int SrsRtmpConn::http_hooks_on_connect() void SrsRtmpConn::http_hooks_on_close() { #ifdef SRS_AUTO_HTTP_CALLBACK + SrsRequest* req = info->req; + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { return; } @@ -1603,6 +1637,8 @@ int SrsRtmpConn::http_hooks_on_publish() int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_CALLBACK + SrsRequest* req = info->req; + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { return ret; } @@ -1638,6 +1674,8 @@ int SrsRtmpConn::http_hooks_on_publish() void SrsRtmpConn::http_hooks_on_unpublish() { #ifdef SRS_AUTO_HTTP_CALLBACK + SrsRequest* req = info->req; + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { return; } @@ -1670,6 +1708,8 @@ int SrsRtmpConn::http_hooks_on_play() int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_CALLBACK + SrsRequest* req = info->req; + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { return ret; } @@ -1705,6 +1745,8 @@ int SrsRtmpConn::http_hooks_on_play() void SrsRtmpConn::http_hooks_on_stop() { #ifdef SRS_AUTO_HTTP_CALLBACK + SrsRequest* req = info->req; + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { return; } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 42f6fda51..7c9c83a6e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -113,6 +113,25 @@ public: virtual void set_recv_timeout(int64_t timeout); }; +/** + * Some information of client. + */ +class SrsClientInfo +{ +public: + // The type of client, play or publish. + SrsRtmpConnType type; + // Whether the client connected at the edge server. + bool edge; + // Original request object from client. + SrsRequest* req; + // Response object to client. + SrsResponse* res; +public: + SrsClientInfo(); + virtual ~SrsClientInfo(); +}; + /** * the client provides the main logic control for RTMP clients. */ @@ -122,8 +141,6 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl friend class SrsPublishRecvThread; private: SrsServer* server; - SrsRequest* req; - SrsResponse* res; SrsStSocket* skt; SrsRtmpServer* rtmp; SrsRefer* refer; @@ -151,8 +168,8 @@ private: int publish_normal_timeout; // whether enable the tcp_nodelay. bool tcp_nodelay; - // The type of client, play or publish. - SrsRtmpConnType client_type; + // About the rtmp client. + SrsClientInfo* info; public: SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip); virtual ~SrsRtmpConn(); @@ -183,10 +200,10 @@ private: virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); virtual int publishing(SrsSource* source); virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); - virtual int acquire_publish(SrsSource* source, bool is_edge); - virtual void release_publish(SrsSource* source, bool is_edge); - virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge); - virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); + virtual int acquire_publish(SrsSource* source); + virtual void release_publish(SrsSource* source); + virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg); + virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); virtual void change_mw_sleep(int sleep_ms); virtual void set_sock_options();