diff --git a/trunk/src/app/srs_app_heartbeat.cpp b/trunk/src/app/srs_app_heartbeat.cpp index 349545f3c..75a502b63 100644 --- a/trunk/src/app/srs_app_heartbeat.cpp +++ b/trunk/src/app/srs_app_heartbeat.cpp @@ -84,9 +84,8 @@ void SrsHttpHeartbeat::heartbeat() ISrsHttpMessage* msg = NULL; if ((ret = http.post(uri.get_path(), req, &msg)) != ERROR_SUCCESS) { - srs_info("http post hartbeart uri failed. " - "url=%s, request=%s, response=%s, ret=%d", - url.c_str(), req.c_str(), res.c_str(), ret); + srs_info("http post hartbeart uri failed. url=%s, request=%s, ret=%d", + url.c_str(), req.c_str(), ret); return; } SrsAutoFree(ISrsHttpMessage, msg); @@ -96,9 +95,8 @@ void SrsHttpHeartbeat::heartbeat() return; } - srs_info("http hook hartbeart success. " - "url=%s, request=%s, status_code=%d, response=%s, ret=%d", - url.c_str(), req.c_str(), status_code, res.c_str(), ret); + srs_info("http hook hartbeart success. url=%s, request=%s, response=%s, ret=%d", + url.c_str(), req.c_str(), res.c_str(), ret); return; } diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index 23ef1f3af..fd39cff84 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -187,7 +187,7 @@ int SrsHttpClient::connect() host.c_str(), port, timeout_us, ret); return ret; } - srs_info("connect to server success. server=%s, port=%d", host, port); + srs_info("connect to server success. server=%s, port=%d", host.c_str(), port); srs_assert(!skt); skt = new SrsStSocket(stfd); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 43be4bc4d..cea0138af 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -134,7 +134,7 @@ int SrsStreamCache::cycle() } if (count <= 0) { - srs_info("http: mw sleep %dms for no msg", mw_sleep); + srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); // directly use sleep, donot use consumer wait. st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); @@ -522,7 +522,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) } if (count <= 0) { - srs_info("http: mw sleep %dms for no msg", mw_sleep); + srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); // directly use sleep, donot use consumer wait. st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); diff --git a/trunk/src/app/srs_app_pithy_print.hpp b/trunk/src/app/srs_app_pithy_print.hpp index 9e333b83d..96c0f7055 100644 --- a/trunk/src/app/srs_app_pithy_print.hpp +++ b/trunk/src/app/srs_app_pithy_print.hpp @@ -55,22 +55,25 @@ public: }; /** -* the stage is used for a collection of object to do print, -* the print time in a stage is constant and not changed. -* for example, stage #1 for all play clients, print time is 3s, -* if there is 10clients, then all clients should print in 10*3s. -* Usage: - SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play(); - SrsAutoFree(SrsPithyPrint, pprint); - while (true) { - pprint->elapse(); - if (pprint->can_print()) { - // print pithy message. - // user can get the elapse time by: pprint->age() - } - // read and write RTMP messages. - } -*/ + * the stage is used for a collection of object to do print, + * the print time in a stage is constant and not changed, + * that is, we always got one message to print every specified time. + * + * for example, stage #1 for all play clients, print time is 3s, + * if there is 1client, it will print every 3s. + * if there is 10clients, random select one to print every 3s. + * Usage: + SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play(); + SrsAutoFree(SrsPithyPrint, pprint); + while (true) { + pprint->elapse(); + if (pprint->can_print()) { + // print pithy message. + // user can get the elapse time by: pprint->age() + } + // read and write RTMP messages. + } + */ class SrsPithyPrint { private: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp old mode 100644 new mode 100755 index 3ff909724..9cf15db59 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -414,8 +414,6 @@ int SrsRtmpConn::stream_service_cycle() } srs_info("set chunk_size=%d success", chunk_size); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - // find a source to serve. SrsSource* source = SrsSource::fetch(req); if (!source) { @@ -432,19 +430,7 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - // check ASAP, to fail it faster if invalid. - if (type != SrsRtmpConnPlay) { - // check publish available - if (!source->can_publish(vhost_is_edge)) { - ret = ERROR_SYSTEM_STREAM_BUSY; - srs_warn("stream %s is already publishing. ret=%d", - req->get_stream_url().c_str(), ret); - // to delay request - st_usleep(SRS_STREAM_BUSY_SLEEP_US); - return ret; - } - } - + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, @@ -479,19 +465,7 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - if (!vhost_is_edge) { - if ((ret = source->acquire_publish()) != ERROR_SUCCESS) { - return ret; - } - } - - ret = fmle_publishing(source); - - if (!vhost_is_edge) { - source->release_publish(); - } - - return ret; + return publishing(source); } case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); @@ -501,19 +475,7 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - if (!vhost_is_edge) { - if ((ret = source->acquire_publish()) != ERROR_SUCCESS) { - return ret; - } - } - - ret = flash_publishing(source); - - if (!vhost_is_edge) { - source->release_publish(); - } - - return ret; + return publishing(source); } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; @@ -767,69 +729,35 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe return ret; } -int SrsRtmpConn::fmle_publishing(SrsSource* source) +int SrsRtmpConn::publishing(SrsSource* source) { int ret = ERROR_SUCCESS; - - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { - srs_error("http hook on_publish failed. ret=%d", ret); + + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { + srs_error("check publish_refer failed. ret=%d", ret); return ret; } - - // use isolate thread to recv, - // @see: https://github.com/simple-rtmp-server/srs/issues/237 - SrsPublishRecvThread trd(rtmp, req, - st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); - - srs_info("start to publish stream %s success", req->stream.c_str()); - ret = do_publishing(source, &trd); - - // stop isolate recv thread - trd.stop(); - - // when edge, notice edge to change state. - // when origin, notice all service to unpublish. - if (vhost_is_edge) { - source->on_edge_proxy_unpublish(); - } else { - source->on_unpublish(); - } - - http_hooks_on_unpublish(); - - return ret; -} - -int SrsRtmpConn::flash_publishing(SrsSource* source) -{ - int ret = ERROR_SUCCESS; - - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + srs_verbose("check publish_refer success."); if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; } - // use isolate thread to recv, - // @see: https://github.com/simple-rtmp-server/srs/issues/237 - SrsPublishRecvThread trd(rtmp, req, - st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) { + // use isolate thread to recv, + // @see: https://github.com/simple-rtmp-server/srs/issues/237 + SrsPublishRecvThread trd(rtmp, req, + st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); - srs_info("start to publish stream %s success", req->stream.c_str()); - ret = do_publishing(source, &trd); + srs_info("start to publish stream %s success", req->stream.c_str()); + ret = do_publishing(source, &trd); - // stop isolate recv thread - trd.stop(); + // stop isolate recv thread + trd.stop(); - // when edge, notice edge to change state. - // when origin, notice all service to unpublish. - if (vhost_is_edge) { - source->on_edge_proxy_unpublish(); - } else { - source->on_unpublish(); + release_publish(source, vhost_is_edge); } http_hooks_on_unpublish(); @@ -840,33 +768,10 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) { int ret = ERROR_SUCCESS; - - if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { - srs_error("check publish_refer failed. ret=%d", ret); - return ret; - } - srs_verbose("check publish_refer success."); SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); SrsAutoFree(SrsPithyPrint, pprint); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - - // when edge, ignore the publish event, directly proxy it. - if (!vhost_is_edge) { - // notify the hls to prepare when publish start. - if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("hls on_publish failed. ret=%d", ret); - return ret; - } - srs_verbose("hls on_publish success."); - } else { - if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { - srs_error("notice edge start publish stream failed. ret=%d", ret); - return ret; - } - } - // start isolate recv thread. if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start isolate recv thread failed. ret=%d", ret); @@ -914,6 +819,43 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) return ret; } +int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) +{ + int ret = ERROR_SUCCESS; + + if (!source->can_publish(is_edge)) { + ret = ERROR_SYSTEM_STREAM_BUSY; + srs_warn("stream %s is already publishing. ret=%d", + req->get_stream_url().c_str(), ret); + return ret; + } + + // when edge, ignore the publish event, directly proxy it. + if (is_edge) { + if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { + srs_error("notice edge start publish stream failed. ret=%d", ret); + } + return ret; + } else { + if ((ret = source->on_publish()) != ERROR_SUCCESS) { + srs_error("notify publish failed. ret=%d", ret); + } + } + + return ret; +} + +void SrsRtmpConn::release_publish(SrsSource* source, bool is_edge) +{ + // when edge, notice edge to change state. + // when origin, notice all service to unpublish. + if (is_edge) { + source->on_edge_proxy_unpublish(); + } else { + source->on_unpublish(); + } +} + int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp old mode 100644 new mode 100755 index 96db60ac7..ddaa7e349 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -111,9 +111,10 @@ private: virtual int check_vhost(); virtual int playing(SrsSource* source); virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); - virtual int fmle_publishing(SrsSource* source); - virtual int flash_publishing(SrsSource* source); + virtual int publishing(SrsSource* source); virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); + virtual int acquire_publish(SrsSource* source, bool is_edge); + virtual void release_publish(SrsSource* source, bool is_edge); virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp old mode 100644 new mode 100755 index d63154fe2..4b25901f1 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1958,26 +1958,6 @@ int SrsSource::on_aggregate(SrsCommonMessage* msg) return ret; } -int SrsSource::acquire_publish() -{ - int ret = ERROR_SUCCESS; - - if (!_can_publish) { - ret = ERROR_SYSTEM_STREAM_BUSY; - srs_warn("publish lock stream failed, ret=%d", ret); - return ret; - } - - _can_publish = false; - - return ret; -} - -void SrsSource::release_publish() -{ - _can_publish = true; -} - int SrsSource::on_publish() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp old mode 100644 new mode 100755 index 86661b190..372803dd5 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -550,13 +550,6 @@ private: public: virtual int on_aggregate(SrsCommonMessage* msg); /** - * the pre-publish is we are very sure we are - * trying to publish stream, please lock the resource, - * and we use release_publish() to release the resource. - */ - virtual int acquire_publish(); - virtual void release_publish(); - /** * publish stream event notify. * @param _req the request from client, the source will deep copy it, * for when reload the request of client maybe invalid.