From ade2376da063d54ea3c57f6fcb52f9fe956a70a0 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 3 Aug 2014 21:22:40 +0800 Subject: [PATCH] fix #57, use lock(acquire/release publish) to avoid duplicated publishing. 0.9.188. --- README.md | 1 + trunk/src/app/srs_app_rtmp_conn.cpp | 110 ++++++++++++++++++++-------- trunk/src/app/srs_app_rtmp_conn.hpp | 2 + trunk/src/app/srs_app_source.cpp | 20 +++++ trunk/src/app/srs_app_source.hpp | 7 ++ trunk/src/core/srs_core.hpp | 2 +- 6 files changed, 109 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 84fbdce2c..f9170dc32 100755 --- a/README.md +++ b/README.md @@ -207,6 +207,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v1.0, 2014-08-03, fix [#57](https://github.com/winlinvip/simple-rtmp-server/issues/57), use lock(acquire/release publish) to avoid duplicated publishing. 0.9.188. * v1.0, 2014-08-03, fix [#85](https://github.com/winlinvip/simple-rtmp-server/issues/85), fix the segment-dvr sequence header missing. 0.9.187. * v1.0, 2014-08-03, fix [#145](https://github.com/winlinvip/simple-rtmp-server/issues/145), refine ffmpeg log, check abitrate for libaacplus. 0.9.186. * v1.0, 2014-08-03, fix [#143](https://github.com/winlinvip/simple-rtmp-server/issues/143), fix retrieve sys stat bug for all linux. 0.9.185. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index ae3ff7af3..6ea5181fa 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -74,6 +74,9 @@ using namespace std; // when edge timeout, retry next. #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) +// to get msgs then totally send out. +#define SYS_MAX_PLAY_SEND_MSGS 128 + SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) { @@ -318,10 +321,11 @@ int SrsRtmpConn::stream_service_cycle() } srs_assert(source != NULL); - // check publish available - // for edge, never check it, for edge use proxy mode. - if (!vhost_is_edge) { - if (type != SrsRtmpConnPlay && !source->can_publish()) { + // check ASAP, to fail it faster if invalid. + if (type != SrsRtmpConnPlay && !vhost_is_edge) { + // check publish available + // for edge, never check it, for edge use proxy mode. + if (!source->can_publish()) { ret = ERROR_SYSTEM_STREAM_BUSY; srs_warn("stream %s is already publishing. ret=%d", req->get_stream_url().c_str(), ret); @@ -379,23 +383,18 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { - srs_error("http hook on_publish failed. ret=%d", ret); - return ret; + if (!vhost_is_edge) { + if ((ret = source->acquire_publish()) != ERROR_SUCCESS) { + return ret; + } } - - srs_info("start to publish stream %s success", req->stream.c_str()); + ret = fmle_publishing(source); - - // when edge, notice edge to change state. - // when origin, notice all service to unpublish. - if (vhost_is_edge) { - source->on_edge_proxy_unpublish(); - } else { - source->on_unpublish(); + + if (!vhost_is_edge) { + source->release_publish(); } - - http_hooks_on_unpublish(); + return ret; } case SrsRtmpConnFlashPublish: { @@ -413,23 +412,18 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { - srs_error("http hook on_publish failed. ret=%d", ret); - return ret; + if (!vhost_is_edge) { + if ((ret = source->acquire_publish()) != ERROR_SUCCESS) { + return ret; + } } - srs_info("flash start to publish stream %s success", req->stream.c_str()); ret = flash_publishing(source); - - // when edge, notice edge to change state. - // when origin, notice all service to unpublish. - if (vhost_is_edge) { - source->on_edge_proxy_unpublish(); - } else { - source->on_unpublish(); + + if (!vhost_is_edge) { + source->release_publish(); } - http_hooks_on_unpublish(); return ret; } default: { @@ -479,8 +473,6 @@ int SrsRtmpConn::check_vhost() return ret; } -#define SYS_MAX_PLAY_SEND_MSGS 128 - int SrsRtmpConn::playing(SrsSource* source) { int ret = ERROR_SUCCESS; @@ -605,6 +597,33 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) { int ret = ERROR_SUCCESS; + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { + srs_error("http hook on_publish failed. ret=%d", ret); + return ret; + } + + srs_info("start to publish stream %s success", req->stream.c_str()); + ret = do_fmle_publishing(source); + + // when edge, notice edge to change state. + // when origin, notice all service to unpublish. + if (vhost_is_edge) { + source->on_edge_proxy_unpublish(); + } else { + source->on_unpublish(); + } + + http_hooks_on_unpublish(); + + return ret; +} + +int SrsRtmpConn::do_fmle_publishing(SrsSource* source) +{ + int ret = ERROR_SUCCESS; + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { srs_error("fmle check publish_refer failed. ret=%d", ret); return ret; @@ -684,6 +703,33 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) { int ret = ERROR_SUCCESS; + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { + srs_error("http hook on_publish failed. ret=%d", ret); + return ret; + } + + srs_info("flash start to publish stream %s success", req->stream.c_str()); + ret = do_flash_publishing(source); + + // when edge, notice edge to change state. + // when origin, notice all service to unpublish. + if (vhost_is_edge) { + source->on_edge_proxy_unpublish(); + } else { + source->on_unpublish(); + } + + http_hooks_on_unpublish(); + + return ret; +} + +int SrsRtmpConn::do_flash_publishing(SrsSource* source) +{ + int ret = ERROR_SUCCESS; + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { srs_error("flash check publish_refer failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 7888620a0..4e48b33d6 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -89,7 +89,9 @@ private: virtual int check_vhost(); virtual int playing(SrsSource* source); virtual int fmle_publishing(SrsSource* source); + virtual int do_fmle_publishing(SrsSource* source); virtual int flash_publishing(SrsSource* source); + virtual int do_flash_publishing(SrsSource* source); virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); private: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 4d66b90ce..e977aa0d8 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1347,6 +1347,26 @@ int SrsSource::on_aggregate(SrsMessage* msg) return ret; } +int SrsSource::acquire_publish() +{ + int ret = ERROR_SUCCESS; + + if (!_can_publish) { + ret = ERROR_SYSTEM_STREAM_BUSY; + srs_warn("publish lock stream failed, ret=%d", ret); + return ret; + } + + _can_publish = false; + + return ret; +} + +void SrsSource::release_publish() +{ + _can_publish = true; +} + int SrsSource::on_publish() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 2e9ae8d69..40000cee2 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -369,6 +369,13 @@ public: virtual int on_video(SrsMessage* video); virtual int on_aggregate(SrsMessage* msg); /** + * the pre-publish is we are very sure we are + * trying to publish stream, please lock the resource, + * and we use release_publish() to release the resource. + */ + virtual int acquire_publish(); + virtual void release_publish(); + /** * publish stream event notify. * @param _req the request from client, the source will deep copy it, * for when reload the request of client maybe invalid. diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 9c11bc8c7..e79100e32 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "187" +#define VERSION_REVISION "188" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS"