From e13d16439eb8d625efefb62b7b151f0fd223bb9b Mon Sep 17 00:00:00 2001 From: hondaxiao Date: Wed, 20 Apr 2022 22:23:46 +0800 Subject: [PATCH] SRT: support rtmp to srt --- trunk/src/app/srs_app_config.cpp | 96 ++++++++-- trunk/src/app/srs_app_config.hpp | 5 + trunk/src/app/srs_app_rtc_source.cpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 26 +++ trunk/src/app/srs_app_source.cpp | 6 +- trunk/src/app/srs_app_source.hpp | 17 ++ trunk/src/app/srs_app_srt_conn.cpp | 207 ++++++++++++---------- trunk/src/app/srs_app_srt_conn.hpp | 19 ++ trunk/src/app/srs_app_srt_server.cpp | 4 +- trunk/src/app/srs_app_srt_source.cpp | 171 ++++++++++++++---- trunk/src/app/srs_app_srt_source.hpp | 39 ++-- trunk/src/kernel/srs_kernel_error.hpp | 2 + trunk/src/protocol/srs_service_st_srt.cpp | 17 +- trunk/src/protocol/srs_service_st_srt.hpp | 3 + 14 files changed, 453 insertions(+), 161 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 01f157cab..8497586b4 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2723,7 +2723,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "play" && n != "publish" && n != "cluster" && n != "security" && n != "http_remux" && n != "dash" && n != "http_static" && n != "hds" && n != "exec" - && n != "in_ack_size" && n != "out_ack_size" && n != "rtc") { + && n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "srt") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str()); } // for each sub directives of vhost. @@ -2880,6 +2880,13 @@ srs_error_t SrsConfig::check_normal_config() return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str()); } } + } else if (n == "srt") { + for (int j = 0; j < (int)conf->directives.size(); j++) { + string m = conf->at(j)->name; + if (m != "enabled" && m != "rtmp_to_srt" && m != "srt_to_rtmp") { + return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.srt.%s of %s", m.c_str(), vhost->arg0().c_str()); + } + } } } } @@ -6757,20 +6764,6 @@ unsigned short SrsConfig::get_srt_listen_port() return (unsigned short)atoi(conf->arg0().c_str()); } -bool SrsConfig::get_srt_mix_correct() { - static bool DEFAULT = true; - SrsConfDirective* conf = root->get("srt_server"); - if (!conf) { - return DEFAULT; - } - - conf = conf->get("mix_correct"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - return SRS_CONF_PERFER_TRUE(conf->arg0()); -} - int SrsConfig::get_srto_maxbw() { static int64_t DEFAULT = -1; SrsConfDirective* conf = root->get("srt_server"); @@ -6971,6 +6964,79 @@ string SrsConfig::get_default_app_name() { return conf->arg0(); } +bool SrsConfig::get_srt_mix_correct() { + static bool DEFAULT = true; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("mix_correct"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +SrsConfDirective* SrsConfig::get_srt(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + return conf? conf->get("srt") : NULL; +} + +bool SrsConfig::get_srt_enabled(std::string vhost) +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = get_srt(vhost); + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +bool SrsConfig::get_srt_to_rtmp(std::string vhost) +{ + static bool DEFAULT = true; + + SrsConfDirective* conf = get_srt(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("srt_to_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +bool SrsConfig::get_srt_from_rtmp(std::string vhost) +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = get_srt(vhost); + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("rtmp_to_srt"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + bool SrsConfig::get_http_stream_enabled() { SrsConfDirective* conf = root->get("http_server"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 7cebd5c80..78a815111 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -662,6 +662,11 @@ public: virtual std::string get_default_app_name(); // Get the mix_correct virtual bool get_srt_mix_correct(); +public: + SrsConfDirective* get_srt(std::string vhost); + bool get_srt_enabled(std::string vhost); + bool get_srt_to_rtmp(std::string vhost); + bool get_srt_from_rtmp(std::string vhost); // http_hooks section private: diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index a5bc4e9a3..47af65645 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -270,7 +270,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); - srs_trace("new source, stream_url=%s", stream_url.c_str()); + srs_trace("new rtc source, stream_url=%s", stream_url.c_str()); source = new SrsRtcSource(); if ((err = source->initialize(r)) != srs_success) { diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 26ccb0d9d..69b135190 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -39,6 +39,7 @@ using namespace std; #include #include #include +#include // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. @@ -950,6 +951,31 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) if (!source->can_publish(info->edge)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); } + +#ifdef SRS_SRT + if (_srs_config->get_rtc_from_rtmp(req->vhost)) { + SrsSrtSource *srt = NULL; + if (!info->edge) { + if ((err = _srs_srt_sources->fetch_or_create(req, &srt)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (!srt->can_publish()) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "srt stream %s busy", req->get_stream_url().c_str()); + } + } + + if (srt) { + SrsSrtFromRtmpBridge *bridger = new SrsSrtFromRtmpBridge(srt); + if ((err = bridger->initialize(req)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "bridger init"); + } + + source->set_bridger(bridger); + } + } +#endif // Check whether RTC stream is busy. #ifdef SRS_RTC diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index c440dc0c0..409eddffb 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1802,7 +1802,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); - srs_trace("new source, stream_url=%s", stream_url.c_str()); + srs_trace("new live source, stream_url=%s", stream_url.c_str()); source = new SrsLiveSource(); if ((err = source->initialize(r, h)) != srs_success) { @@ -1929,7 +1929,7 @@ SrsLiveSource::SrsLiveSource() handler = NULL; bridge_ = NULL; - + play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); gop_cache = new SrsGopCache(); @@ -2635,7 +2635,7 @@ void SrsLiveSource::on_unpublish() bridge_->on_unpublish(); srs_freep(bridge_); } - + // no consumer, stream is die. if (consumers.empty()) { die_at = srs_get_system_time(); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 5ebdb4614..6bfc246c6 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -468,6 +468,23 @@ public: // Global singleton instance. extern SrsLiveSourceManager* _srs_sources; +// Destination type. +enum SrsBridgeDestType { + SrsBridgeDestTypeRtmp = 1, + SrsBridgeDestTypeRTC = 2, + SrsBridgeDestTypeSRT = 3, +}; + +class ISrsBridge { +public: + ISrsBridge(SrsBridgeDestType type); + virtual ~ISrsBridge(); +public: + SrsBridgeDestType get_type() const; +protected: + SrsBridgeDestType type_; +}; + // For RTMP2RTC, bridge SrsLiveSource to SrsRtcSource class ISrsLiveSourceBridge { diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index a0a279c07..8986128ed 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -87,6 +87,67 @@ srs_error_t SrsSrtConnection::writev(const iovec *iov, int iov_size, ssize_t* nw return srs_error_new(ERROR_SRT_CONN, "unsupport method"); } +SrsSrtRecvThread::SrsSrtRecvThread(SrsSrtConnection* srt_conn) +{ + srt_conn_ = srt_conn; + trd_ = new SrsSTCoroutine("srt-recv", this, _srs_context->get_id()); + recv_err_ = srs_success; +} + +SrsSrtRecvThread::~SrsSrtRecvThread() +{ + srs_freep(trd_); + srs_error_reset(recv_err_); +} + +srs_error_t SrsSrtRecvThread::cycle() +{ + srs_error_t err = srs_success; + + if ((err = do_cycle()) != srs_success) { + recv_err_ = srs_error_copy(err); + } + + return err; +} + +srs_error_t SrsSrtRecvThread::do_cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "srt: thread quit"); + } + + char buf[1316]; + ssize_t nb = 0; + if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) { + if (srs_error_code(err) != ERROR_SRT_TIMEOUT) { + return srs_error_wrap(err, "srt read"); + } + } + } + + return err; +} + +srs_error_t SrsSrtRecvThread::start() +{ + srs_error_t err = srs_success; + + if ((err = trd_->start()) != srs_success) { + return srs_error_wrap(err, "start srt recv thread"); + } + + return err; +} + +srs_error_t SrsSrtRecvThread::get_recv_err() +{ + return srs_error_copy(recv_err_); +} + SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, SRTSOCKET srt_fd, std::string ip, int port) { // Create a identify for this client. @@ -178,7 +239,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() { srs_error_t err = srs_success; - if ((err != fetch_or_create_source()) != srs_success) { + if ((err = fetch_or_create_source()) != srs_success) { return srs_error_wrap(err, "fetch or create srt source"); } @@ -216,6 +277,10 @@ srs_error_t SrsMpegtsSrtConn::fetch_or_create_source() return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str()); } + if (! _srs_config->get_srt_enabled(req_->vhost)) { + return srs_error_new(ERROR_SRT_CONN, "srt disabled, vhost=%s", req_->vhost.c_str()); + } + srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s", streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str()); @@ -267,23 +332,25 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() return srs_error_new(ERROR_SRT_SOURCE_BUSY, "srt stream %s busy", req_->get_stream_url().c_str()); } - // Check rtmp stream is busy. - SrsLiveSource *live_source = _srs_sources->fetch(req_); - if (live_source && !live_source->can_publish(false)) { - return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); - } + if (_srs_config->get_srt_to_rtmp(req_->vhost)) { + // Check rtmp stream is busy. + SrsLiveSource *live_source = _srs_sources->fetch(req_); + if (live_source && !live_source->can_publish(false)) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); + } - if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } + if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } - SrsRtmpFromTsBridge *bridger = new SrsRtmpFromTsBridge(live_source); - if ((err = bridger->initialize(req_)) != srs_success) { - srs_freep(bridger); - return srs_error_wrap(err, "create bridger"); - } + SrsRtmpFromSrtBridge *bridger = new SrsRtmpFromSrtBridge(live_source); + if ((err = bridger->initialize(req_)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "create bridger"); + } - srt_source_->set_bridger(bridger); + srt_source_->set_bridger(bridger); + } if ((err = srt_source_->on_publish()) != srs_success) { return srs_error_wrap(err, "srt source publish"); @@ -297,83 +364,6 @@ void SrsMpegtsSrtConn::release_publish() srt_source_->on_unpublish(); } -/* -srs_error_t SrsMpegtsSrtConn::do_cycle() -{ - srs_error_t err = srs_success; - - string streamid = ""; - if ((err = srs_srt_get_streamid(srt_fd_, streamid)) != srs_success) { - return srs_error_wrap(err, "get srt streamid"); - } - - // Must have streamid, because srt ts packet will convert to rtmp or rtc. - if (streamid.empty()) { - return srs_error_new(ERROR_SRT_CONN, "empty srt streamid"); - } - - // Detect streamid of srt to request. - if (! srs_srt_streamid_to_request(streamid, mode_, req_)) { - return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str()); - } - - srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s", - streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str()); - - if ((err = _srs_srt_sources->fetch_or_create(req_, &srt_source_)) != srs_success) { - return srs_error_wrap(err, "fetch srt source"); - } - - if (mode_ == SrtModePush) { - if ((err = http_hooks_on_publish()) != srs_success) { - return srs_error_wrap(err, "srt: callback on publish"); - } - // Do srt publish. - if (! srt_source_->can_publish()) { - return srs_error_new(ERROR_SRT_SOURCE_BUSY, "srt stream %s busy", req_->get_stream_url().c_str()); - } - - SrsLiveSource *live_source = _srs_sources->fetch(req_); - if (live_source && !live_source->can_publish(false)) { - return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); - } - - if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - - SrsRtmpFromTsBridge *bridger = new SrsRtmpFromTsBridge(live_source); - if ((err = bridger->initialize(req_)) != srs_success) { - srs_freep(bridger); - return srs_error_wrap(err, "create bridger"); - } - - srt_source_->set_bridger(bridger); - - if ((err = srt_source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "srt source publish"); - } - - err = do_publish_cycle(); - - srt_source_->on_unpublish(); - http_hooks_on_unpublish(); - } else if (mode_ == SrtModePull) { - if ((err = http_hooks_on_play()) != srs_success) { - return srs_error_wrap(err, "srt: callback on play"); - } - // Do srt play. - err = do_play_cycle(); - - http_hooks_on_stop(); - } else { - srs_assert(false); - } - - return err; -} -*/ - srs_error_t SrsMpegtsSrtConn::do_publishing() { srs_error_t err = srs_success; @@ -394,7 +384,18 @@ srs_error_t SrsMpegtsSrtConn::do_publishing() // reportable if (pprint->can_print()) { + SRT_TRACEBSTATS srt_stats; + srs_error_t err_tmp = srs_srt_get_stats(srt_fd_, &srt_stats, true); + if (err_tmp != srs_success) { + srs_freep(err_tmp); + } else { + srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " Transport Stats # " + "pktRecv=%ld, pktRcvLoss=%d, pktRcvRetrans=%d, pktRcvDrop=%d", + srt_stats.pktRecv, srt_stats.pktRcvLoss, srt_stats.pktRcvRetrans, srt_stats.pktRcvDrop); + } + kbps_->sample(); + srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " time=%d, packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", (int)pprint->age(), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(), kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m()); @@ -436,6 +437,11 @@ srs_error_t SrsMpegtsSrtConn::do_playing() SrsPithyPrint* pprint = SrsPithyPrint::create_srt_play(); SrsAutoFree(SrsPithyPrint, pprint); + SrsSrtRecvThread srt_recv_trd(srt_conn_); + if ((err = srt_recv_trd.start()) != srs_success) { + return srs_error_wrap(err, "start srt recv trd"); + } + int nb_packets = 0; while (true) { @@ -443,6 +449,10 @@ srs_error_t SrsMpegtsSrtConn::do_playing() return srs_error_wrap(err, "srt play thread"); } + if ((err = srt_recv_trd.get_recv_err()) != srs_success) { + return srs_error_wrap(err, "srt play recv thread"); + } + pprint->elapse(); // Wait for amount of packets. @@ -451,13 +461,24 @@ srs_error_t SrsMpegtsSrtConn::do_playing() consumer->dump_packet(&pkt); if (!pkt) { // TODO: FIXME: We should check the quit event. - consumer->wait(1); + consumer->wait(1, 1000 * SRS_UTIME_MILLISECONDS); continue; } // reportable if (pprint->can_print()) { + SRT_TRACEBSTATS srt_stats; + srs_error_t err_tmp = srs_srt_get_stats(srt_fd_, &srt_stats, true); + if (err_tmp != srs_success) { + srs_freep(err_tmp); + } else { + srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " Transport Stats # " + "pktSent=%ld, pktSndLoss=%d, pktRetrans=%d, pktSndDrop=%d", + srt_stats.pktSent, srt_stats.pktSndLoss, srt_stats.pktRetrans, srt_stats.pktSndDrop); + } + kbps_->sample(); + srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " time=%d, packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", (int)pprint->age(), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(), kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m()); diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 351894f70..40bf99743 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -51,6 +51,25 @@ private: SrsSrtSocket* srt_skt_; }; +class SrsSrtRecvThread : public ISrsCoroutineHandler +{ +public: + SrsSrtRecvThread(SrsSrtConnection* srt_conn); + ~SrsSrtRecvThread(); +// Interface ISrsCoroutineHandler +public: + virtual srs_error_t cycle(); +private: + srs_error_t do_cycle(); +public: + srs_error_t start(); + srs_error_t get_recv_err(); +private: + SrsSrtConnection* srt_conn_; + SrsCoroutine* trd_; + srs_error_t recv_err_; +}; + class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler { public: diff --git a/trunk/src/app/srs_app_srt_server.cpp b/trunk/src/app/srs_app_srt_server.cpp index 7e709706d..a550fe357 100644 --- a/trunk/src/app/srs_app_srt_server.cpp +++ b/trunk/src/app/srs_app_srt_server.cpp @@ -380,12 +380,14 @@ srs_error_t SrsSrtEventLoop::cycle() return srs_error_wrap(err, "srt listener"); } + // Check events fired, return directly. if ((err = srt_poller_->wait(0)) != srs_success) { srs_error("srt poll wait failed, err=%s", srs_error_desc(err).c_str()); srs_error_reset(err); } - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + // Schedule srt event by state-thread. + srs_usleep(1 * SRS_UTIME_MILLISECONDS); } return err; diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index f3025ca4a..0f03ac820 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -125,7 +125,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** p // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); - srs_trace("new ts source, stream_url=%s", stream_url.c_str()); + srs_trace("new srt source, stream_url=%s", stream_url.c_str()); source = new SrsSrtSource(); if ((err = source->initialize(r)) != srs_success) { @@ -218,7 +218,7 @@ srs_error_t SrsSrtConsumer::dump_packet(SrsSrtPacket** ppkt) return err; } -void SrsSrtConsumer::wait(int nb_msgs) +void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout) { mw_min_msgs = nb_msgs; @@ -231,18 +231,19 @@ void SrsSrtConsumer::wait(int nb_msgs) mw_waiting = true; // use cond block wait for high performance mode. - srs_cond_wait(mw_wait); + srs_cond_timedwait(mw_wait, timeout); } -ISrsTsSourceBridger::ISrsTsSourceBridger() +ISrsSrtSourceBridge::ISrsSrtSourceBridge(SrsBridgeDestType type) : ISrsBridge(type) { } -ISrsTsSourceBridger::~ISrsTsSourceBridger() +ISrsSrtSourceBridge::~ISrsSrtSourceBridge() { } -SrsRtmpFromTsBridge::SrsRtmpFromTsBridge(SrsLiveSource* source) +SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) + : ISrsSrtSourceBridge(SrsBridgeDestTypeRtmp) { ts_ctx_ = new SrsTsContext(); @@ -254,13 +255,13 @@ SrsRtmpFromTsBridge::SrsRtmpFromTsBridge(SrsLiveSource* source) req_ = NULL; } -SrsRtmpFromTsBridge::~SrsRtmpFromTsBridge() +SrsRtmpFromSrtBridge::~SrsRtmpFromSrtBridge() { srs_freep(ts_ctx_); srs_freep(req_); } -srs_error_t SrsRtmpFromTsBridge::on_publish() +srs_error_t SrsRtmpFromSrtBridge::on_publish() { srs_error_t err = srs_success; @@ -271,7 +272,7 @@ srs_error_t SrsRtmpFromTsBridge::on_publish() return err; } -srs_error_t SrsRtmpFromTsBridge::on_packet(SrsSrtPacket *pkt) +srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt) { srs_error_t err = srs_success; @@ -297,12 +298,12 @@ srs_error_t SrsRtmpFromTsBridge::on_packet(SrsSrtPacket *pkt) return err; } -void SrsRtmpFromTsBridge::on_unpublish() +void SrsRtmpFromSrtBridge::on_unpublish() { live_source_->on_unpublish(); } -srs_error_t SrsRtmpFromTsBridge::initialize(SrsRequest* req) +srs_error_t SrsRtmpFromSrtBridge::initialize(SrsRequest* req) { srs_error_t err = srs_success; @@ -312,7 +313,7 @@ srs_error_t SrsRtmpFromTsBridge::initialize(SrsRequest* req) return err; } -srs_error_t SrsRtmpFromTsBridge::on_ts_message(SrsTsMessage* msg) +srs_error_t SrsRtmpFromSrtBridge::on_ts_message(SrsTsMessage* msg) { srs_error_t err = srs_success; @@ -352,7 +353,7 @@ srs_error_t SrsRtmpFromTsBridge::on_ts_message(SrsTsMessage* msg) return err; } -srs_error_t SrsRtmpFromTsBridge::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) +srs_error_t SrsRtmpFromSrtBridge::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) { srs_error_t err = srs_success; @@ -419,7 +420,7 @@ srs_error_t SrsRtmpFromTsBridge::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) return on_h264_frame(msg, ipb_frames); } -srs_error_t SrsRtmpFromTsBridge::check_sps_pps_change(SrsTsMessage* msg) +srs_error_t SrsRtmpFromSrtBridge::check_sps_pps_change(SrsTsMessage* msg) { srs_error_t err = srs_success; @@ -464,7 +465,7 @@ srs_error_t SrsRtmpFromTsBridge::check_sps_pps_change(SrsTsMessage* msg) return err; } -srs_error_t SrsRtmpFromTsBridge::on_h264_frame(SrsTsMessage* msg, vector >& ipb_frames) +srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vector >& ipb_frames) { srs_error_t err = srs_success; @@ -520,7 +521,7 @@ srs_error_t SrsRtmpFromTsBridge::on_h264_frame(SrsTsMessage* msg, vector::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) { + ISrsSrtSourceBridge* bridge = *iter; + srs_freep(bridge); + } + bridgers_.clear(); } srs_error_t SrsSrtSource::initialize(SrsRequest* r) @@ -702,10 +706,18 @@ void SrsSrtSource::update_auth(SrsRequest* r) req->update_auth(r); } -void SrsSrtSource::set_bridger(ISrsTsSourceBridger *bridger) +void SrsSrtSource::set_bridger(ISrsSrtSourceBridge *bridger) { - srs_freep(bridger_); - bridger_ = bridger; + for (vector::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) { + ISrsSrtSourceBridge* b = *iter; + if (b->get_type() == bridger->get_type()) { + srs_freep(b); + *iter = bridger; + return; + } + } + + bridgers_.push_back(bridger); } srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer) @@ -752,8 +764,9 @@ srs_error_t SrsSrtSource::on_publish() return srs_error_wrap(err, "source id change"); } - if (bridger_) { - if ((err = bridger_->on_publish()) != srs_success) { + for (vector::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) { + ISrsSrtSourceBridge* bridge = *iter; + if ((err = bridge->on_publish()) != srs_success) { return srs_error_wrap(err, "bridger on publish"); } } @@ -773,10 +786,12 @@ void SrsSrtSource::on_unpublish() can_publish_ = true; - if (bridger_) { - bridger_->on_unpublish(); - srs_freep(bridger_); + for (vector::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) { + ISrsSrtSourceBridge* bridge = *iter; + bridge->on_unpublish(); + srs_freep(bridge); } + bridgers_.clear(); } srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet) @@ -790,8 +805,104 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet) } } - if (bridger_ && (err = bridger_->on_packet(packet)) != srs_success) { - return srs_error_wrap(err, "bridger consume message"); + for (vector::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) { + ISrsSrtSourceBridge* bridge = *iter; + if ((err = bridge->on_packet(packet)) != srs_success) { + return srs_error_wrap(err, "bridger consume message"); + } + } + + return err; +} + +SrsSrtFromRtmpBridge::SrsSrtFromRtmpBridge(SrsSrtSource* source) + : ISrsLiveSourceBridger(SrsBridgeDestTypeSRT) +{ + srt_source_ = source; + ts_muxer_ = NULL; + offset_ = 0; +} + +SrsSrtFromRtmpBridge::~SrsSrtFromRtmpBridge() +{ + srs_freep(ts_muxer_); +} + +srs_error_t SrsSrtFromRtmpBridge::initialize(SrsRequest* r) +{ + srs_error_t err = srs_success; + + // TODO: FIXME: check config. + req_ = r; + + ts_muxer_ = new SrsTsTransmuxer(); + if ((err = ts_muxer_->initialize(this)) != srs_success) { + return srs_error_wrap(err, "init ts muxer"); + } + + return err; +} + +srs_error_t SrsSrtFromRtmpBridge::on_publish() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: check if enable rtmp_to_srt + + if ((err = srt_source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } + + return err; +} + +void SrsSrtFromRtmpBridge::on_unpublish() +{ + // TODO: FIXME: check if enable rtmp_to_srt + srt_source_->on_unpublish(); +} + +srs_error_t SrsSrtFromRtmpBridge::on_audio(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + if ((err = ts_muxer_->write_audio(msg->timestamp, msg->payload, msg->size)) != srs_success) { + return srs_error_wrap(err, "rtmp to srt, ts mux audio"); + } + + return err; +} + +srs_error_t SrsSrtFromRtmpBridge::on_video(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + if ((err = ts_muxer_->write_video(msg->timestamp, msg->payload, msg->size)) != srs_success) { + return srs_error_wrap(err, "rtmp to srt, ts mux video"); + } + + return err; +} + +srs_error_t SrsSrtFromRtmpBridge::write(void* buf, size_t size, ssize_t* nwrite) +{ + srs_error_t err = srs_success; + + if (size % SRS_TS_PACKET_SIZE != 0) { + return srs_error_new(ERROR_RTMP_TO_SRT, "invalid ts size=%u", size); + } + + for (int i = 0; i < size; i += SRS_TS_PACKET_SIZE) { + memcpy(ts_buf_ + offset_, (const char*)buf + i, SRS_TS_PACKET_SIZE); + offset_ += SRS_TS_PACKET_SIZE; + if (offset_ >= 1316) { + offset_ = 0; + SrsSrtPacket* packet = new SrsSrtPacket(); + SrsAutoFree(SrsSrtPacket, packet); + packet->wrap(ts_buf_, 1316); + + srt_source_->on_packet(packet); + } } return err; diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index d285143c9..14c2a5517 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -14,6 +14,7 @@ #include #include +#include class SrsSharedPtrMessage; class SrsRequest; @@ -86,25 +87,25 @@ public: // For SRT, we only got one packet, because there is not many packets in queue. virtual srs_error_t dump_packet(SrsSrtPacket** ppkt); // Wait for at-least some messages incoming in queue. - virtual void wait(int nb_msgs); + virtual void wait(int nb_msgs, srs_utime_t timeout); }; -class ISrsTsSourceBridger +class ISrsSrtSourceBridge : public ISrsBridge { public: - ISrsTsSourceBridger(); - virtual ~ISrsTsSourceBridger(); + ISrsSrtSourceBridge(SrsBridgeDestType type); + virtual ~ISrsSrtSourceBridge(); public: virtual srs_error_t on_publish() = 0; virtual srs_error_t on_packet(SrsSrtPacket *pkt) = 0; virtual void on_unpublish() = 0; }; -class SrsRtmpFromTsBridge : public ISrsTsSourceBridger, public ISrsTsHandler +class SrsRtmpFromSrtBridge : public ISrsSrtSourceBridge, public ISrsTsHandler { public: - SrsRtmpFromTsBridge(SrsLiveSource* source); - virtual ~SrsRtmpFromTsBridge(); + SrsRtmpFromSrtBridge(SrsLiveSource* source); + virtual ~SrsRtmpFromSrtBridge(); public: virtual srs_error_t on_publish(); virtual srs_error_t on_packet(SrsSrtPacket *pkt); @@ -153,7 +154,7 @@ public: // Update the authentication information in request. virtual void update_auth(SrsRequest* r); public: - void set_bridger(ISrsTsSourceBridger *bridger); + void set_bridger(ISrsSrtSourceBridge *bridger); public: // Create consumer // @param consumer, output the create consumer. @@ -178,18 +179,14 @@ private: // To delivery packets to clients. std::vector consumers; bool can_publish_; - ISrsTsSourceBridger* bridger_; + std::vector bridgers_; }; -/* -class SrsTsFromRtmpBridger : public ISrsLiveSourceBridger +class SrsSrtFromRtmpBridge : public ISrsLiveSourceBridger, public ISrsStreamWriter { -private: - SrsRequest* req; - SrsSrtSource* source_; public: - SrsTsFromRtmpBridger(SrsSrtSource* source); - virtual ~SrsTsFromRtmpBridger(); + SrsSrtFromRtmpBridge(SrsSrtSource* source); + virtual ~SrsSrtFromRtmpBridge(); public: virtual srs_error_t initialize(SrsRequest* r); // Interface for ISrsLiveSourceBridger @@ -198,8 +195,16 @@ public: virtual void on_unpublish(); virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); virtual srs_error_t on_video(SrsSharedPtrMessage* msg); +// Interface for ISrsStreamWriter +public: + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); +private: + SrsRequest* req_; + SrsSrtSource* srt_source_; + SrsTsTransmuxer* ts_muxer_; + char ts_buf_[1316]; + int offset_; }; -*/ #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 94f8c9c54..457afb346 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -358,6 +358,8 @@ #define ERROR_SRT_SOCKOPT 6005 #define ERROR_SRT_CONN 6006 #define ERROR_SRT_SOURCE_BUSY 6007 +#define ERROR_RTMP_TO_SRT 6008 +#define ERROR_SRT_STATS 6009 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/protocol/srs_service_st_srt.cpp b/trunk/src/protocol/srs_service_st_srt.cpp index 99e9146c6..5809ddecb 100644 --- a/trunk/src/protocol/srs_service_st_srt.cpp +++ b/trunk/src/protocol/srs_service_st_srt.cpp @@ -376,6 +376,18 @@ srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& p return err; } +srs_error_t srs_srt_get_stats(SRTSOCKET srt_fd, SRT_TRACEBSTATS* srt_stats, bool clear) +{ + srs_error_t err = srs_success; + + int ret = srt_bstats(srt_fd, srt_stats, clear); + if (ret != 0) { + return srs_error_new(ERROR_SRT_STATS, "srt_bstats"); + } + + return err; +} + SrsSrtPoller::SrsSrtPoller() { srt_epoller_fd_ = -1; @@ -395,6 +407,9 @@ srs_error_t SrsSrtPoller::initialize() srt_epoller_fd_ = srt_epoll_create(); events_.resize(1024); + // Enable srt empty poller, avoid warning. + srt_epoll_set(srt_epoller_fd_, SRT_EPOLL_ENABLE_EMPTY); + return err; } @@ -442,7 +457,7 @@ srs_error_t SrsSrtPoller::wait(int timeout_ms) // wait srt event fired, will timeout after `timeout_ms` milliseconds. int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size(), timeout_ms); if (ret < 0) { - return srs_error_new(ERROR_SRT_EPOLL, "srt_epoll_uwait, ret=%d", ret); + return srs_error_new(ERROR_SRT_EPOLL, "srt_epoll_uwait, ret=%d, err=%s", ret, srt_getlasterror_str()); } for (int i = 0; i < ret; ++i) { diff --git a/trunk/src/protocol/srs_service_st_srt.hpp b/trunk/src/protocol/srs_service_st_srt.hpp index 1e548b490..f2d1f7755 100644 --- a/trunk/src/protocol/srs_service_st_srt.hpp +++ b/trunk/src/protocol/srs_service_st_srt.hpp @@ -61,6 +61,9 @@ extern srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid) extern srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port); extern srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port); +// Get SRT stats. +extern srs_error_t srs_srt_get_stats(SRTSOCKET srt_fd, SRT_TRACEBSTATS* srt_stats, bool clear); + class SrsSrtSocket; // Srt poller, subscribe/unsubscribed events and wait them fired.