From 74bb47c13f45a9ecb9a76b8ad3ab1b197ca7f786 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 1 May 2021 22:15:57 +0800 Subject: [PATCH] SquashSRS4: Support RTC2RTMP. --- README.md | 24 +- trunk/conf/full.conf | 101 ++-- trunk/conf/rtc.conf | 9 +- trunk/conf/rtc2rtmp.conf | 42 ++ trunk/conf/rtc_live.conf | 47 -- trunk/doc/source.flv | 1 + trunk/src/app/srs_app_config.cpp | 75 ++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_http_stream.hpp | 1 + trunk/src/app/srs_app_listener.cpp | 4 + trunk/src/app/srs_app_rtc_codec.cpp | 731 ++++++++++-------------- trunk/src/app/srs_app_rtc_codec.hpp | 125 ++-- trunk/src/app/srs_app_rtc_conn.cpp | 24 + trunk/src/app/srs_app_rtc_source.cpp | 674 ++++++++++++++++++---- trunk/src/app/srs_app_rtc_source.hpp | 86 ++- trunk/src/app/srs_app_rtmp_conn.cpp | 49 +- trunk/src/app/srs_app_source.cpp | 40 +- trunk/src/app/srs_app_source.hpp | 9 +- trunk/src/core/srs_core_version4.hpp | 2 +- trunk/src/kernel/srs_kernel_codec.hpp | 15 +- trunk/src/kernel/srs_kernel_rtc_rtp.cpp | 27 + trunk/src/kernel/srs_kernel_rtc_rtp.hpp | 2 + 22 files changed, 1246 insertions(+), 844 deletions(-) create mode 100644 trunk/conf/rtc2rtmp.conf delete mode 100644 trunk/conf/rtc_live.conf create mode 120000 trunk/doc/source.flv diff --git a/README.md b/README.md index 00820e063..6d5410643 100755 --- a/README.md +++ b/README.md @@ -22,13 +22,13 @@ docker run --rm -p 1935:1935 -p 1985:1985 -p 8080:8080 \ ossrs/srs:v4.0.85 ``` -> To enable WebRTC, user MUST set the env `CANDIDATE`, see [#307](https://github.com/ossrs/srs/issues/307#issue-76908382). +> To enable WebRTC, user MUST set the env `CANDIDATE`, see [wiki](https://github.com/ossrs/srs/wiki/v4_CN_WebRTC#config-candidate). Open [http://localhost:8080/](http://localhost:8080/) to check it, then publish -[stream](https://github.com/ossrs/srs/blob/3.0release/trunk/doc/source.200kbps.768x320.flv) by: +[stream](https://github.com/ossrs/srs/blob/3.0release/trunk/doc/source.flv) by: ```bash -docker run --rm --network=host ossrs/srs:encoder ffmpeg -re -i ./doc/source.200kbps.768x320.flv \ +docker run --rm --network=host ossrs/srs:encoder ffmpeg -re -i ./doc/source.flv \ -c copy -f flv -y rtmp://localhost/live/livestream ``` > Note: If WebRTC enabled, you can publish by [H5](http://localhost:8080/players/rtc_publisher.html?autostart=true). @@ -67,8 +67,7 @@ Fast index for Wikis: * How to deliver HTTP-FLV streaming?([CN][v4_CN_SampleHttpFlv], [EN][v4_EN_SampleHttpFlv]) * How to deliver HLS streaming?([CN][v4_CN_SampleHLS], [EN][v4_EN_SampleHLS]) * How to deliver low-latency streaming?([CN][v4_CN_SampleRealtime], [EN][v4_EN_SampleRealtime]) -* Usage: How to play WebRTC from SRS? [#307](https://github.com/ossrs/srs/issues/307) -* Usage: How to publish WebRTC to SRS? [#307](https://github.com/ossrs/srs/issues/307) +* How to use WebRTC? ([CN][v4_CN_WebRTC], [EN][v4_EN_WebRTC]) Other important wiki: @@ -127,6 +126,7 @@ Other important wiki: - [x] [Experimental] Support mux RTP/RTCP/DTLS/SRTP on one port for WebRTC, [#307][bug #307]. - [x] [Experimental] Support client address changing for WebRTC, [#307][bug #307]. - [x] [Experimental] Support transcode RTMP/AAC to WebRTC/Opus, [#307][bug #307]. +- [x] [Experimental] Support AV1 codec for WebRTC, [#2324][bug #2324]. - [x] [Experimental] Enhance HTTP Stream Server for HTTP-FLV, HTTPS, HLS etc. [#1657][bug #1657]. - [x] [Experimental] Support push stream by GB28181, [#1500][bug #1500]. - [x] [Experimental] Support DVR in MP4 format, read [#738][bug #738]. @@ -163,7 +163,10 @@ Other important wiki: ## V4 changes -* v4.0, 2021-04-29, RTC: Support av1 for Chrome M90. 4.0.91 +* v5.0, 2021-04-20, Support RTC2RTMP bridger and shared FastTimer. 4.0.95 +* v5.0, 2021-04-20, Refine transcoder to support aac2opus and opus2aac. 4.0.94 +* v4.0, 2021-05-01, Timer: Extract and apply shared FastTimer. 4.0.93 +* v4.0, 2021-04-29, RTC: Support AV1 for Chrome M90. 4.0.91 * v4.0, 2021-04-24, Change push-RTSP as deprecated feature. * v4.0, 2021-04-24, Player: Change the default from RTMP to HTTP-FLV. * v4.0, 2021-04-24, Disable CherryPy by --cherrypy=off. 4.0.90 @@ -1263,8 +1266,8 @@ Maintainers of SRS project: * [Winlin](https://github.com/winlinvip): All areas of streaming server and documents. * [Wenjie](https://github.com/wenjiegit): The focus of his work is on the [HDS](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_DeliveryHDS) module. * [Runner365](https://github.com/runner365): The focus of his work is on the [SRT](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_SRTWiki) module. -* [John](https://github.com/xiaozhihong): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_RTCWiki) module. -* [B.P.Y(Bepartofyou)](https://github.com/Bepartofyou): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_RTCWiki) module. +* [John](https://github.com/xiaozhihong): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_WebRTC) module. +* [B.P.Y(Bepartofyou)](https://github.com/Bepartofyou): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_WebRTC) module. * [Lixin](https://github.com/xialixin): Focus on [GB28181](https://github.com/ossrs/srs/issues/1500) module. A big THANK YOU goes to: @@ -1387,6 +1390,8 @@ Winlin [v4_EN_SampleForward]: https://github.com/ossrs/srs/wiki/v4_EN_SampleForward [v4_CN_SampleRealtime]: https://github.com/ossrs/srs/wiki/v4_CN_SampleRealtime [v4_EN_SampleRealtime]: https://github.com/ossrs/srs/wiki/v4_EN_SampleRealtime +[v4_CN_WebRTC]: https://github.com/ossrs/srs/wiki/v4_CN_WebRTC +[v4_EN_WebRTC]: https://github.com/ossrs/srs/wiki/v4_EN_WebRTC [v4_CN_SampleARM]: https://github.com/ossrs/srs/wiki/v4_CN_SampleARM [v4_EN_SampleARM]: https://github.com/ossrs/srs/wiki/v4_EN_SampleARM [v4_CN_SampleIngest]: https://github.com/ossrs/srs/wiki/v4_CN_SampleIngest @@ -1817,7 +1822,6 @@ Winlin [bug #1543]: https://github.com/ossrs/srs/issues/1543 [bug #1509]: https://github.com/ossrs/srs/issues/1509 [bug #1575]: https://github.com/ossrs/srs/issues/1575 -[bug #307]: https://github.com/ossrs/srs/issues/307 [bug #1070]: https://github.com/ossrs/srs/issues/1070 [bug #1580]: https://github.com/ossrs/srs/issues/1580 [bug #1547]: https://github.com/ossrs/srs/issues/1547 @@ -1872,6 +1876,8 @@ Winlin [bug #1998]: https://github.com/ossrs/srs/issues/1998 [bug #2106]: https://github.com/ossrs/srs/issues/2106 [bug #2011]: https://github.com/ossrs/srs/issues/2011 +[bug #2324]: https://github.com/ossrs/srs/issues/2324 +[bug #1500]: https://github.com/ossrs/srs/issues/1500 [bug #zzzzzzzzzzzzz]: https://github.com/ossrs/srs/issues/zzzzzzzzzzzzz [exo #828]: https://github.com/google/ExoPlayer/pull/828 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index b17085f03..99a2265f4 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -446,14 +446,14 @@ srt_server { rtc_server { # Whether enable WebRTC server. # default: off - enabled on; + enabled on; # The udp listen port, we will reuse it for connections. # default: 8000 - listen 8000; + listen 8000; # The exposed candidate IPs, response in SDP candidate line. It can be: # * Retrieve server IP automatically, from all network interfaces. # eth0 Retrieve server IP by specified network interface name. # TODO: Implements it. - # $CANDIDATE Read the IP from ENV variable, use * if not set, see https://github.com/ossrs/srs/issues/307#issuecomment-599028124 + # $CANDIDATE Read the IP from ENV variable, use * if not set. # x.x.x.x A specified IP address or DNS name, which can be access by client such as Chrome. # You can specific more than one interface name: # eth0 eth1 Use network interface eth0 and eth1. # TODO: Implements it. @@ -461,37 +461,38 @@ rtc_server { # 192.168.1.3 10.1.2.3 rtc.me # TODO: Implements it. # And by multiple ENV variables: # $CANDIDATE $EIP # TODO: Implements it. - # @remark For Firefox, the candidate MUST be IP, MUST NOT be DNS name, see also https://github.com/ossrs/srs/pull/1998#issuecomment-726442999 + # @remark For Firefox, the candidate MUST be IP, MUST NOT be DNS name. + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate # default: * - candidate *; - # The IP family filter for candidate, it can be: + candidate *; + # The IP family filter for auto discover candidate, it can be: # ipv4 Filter IP v4 candidates. # ipv6 Filter IP v6 candidates. # all Filter all IP v4 or v6 candidates. # For example, if set to ipv4, we only use the IPv4 address as candidate. # default: ipv4 - ip_family ipv4; + ip_family ipv4; # Whether use ECDSA certificate. # If not, use RSA certificate. # default: on - ecdsa on; + ecdsa on; # Whether encrypt RTP packet by SRTP. # @remark Should always turn it on, or Chrome will fail. # default: on - encrypt on; + encrypt on; # We listen multiple times at the same port, by REUSEPORT, to increase the UDP queue. # Note that you can set to 1 and increase the system UDP buffer size by net.core.rmem_max # and net.core.rmem_default or just increase this to get larger UDP recv and send buffer. # default: 1 - reuseport 1; + reuseport 1; # Whether merge multiple NALUs into one. # @see https://github.com/ossrs/srs/issues/307#issuecomment-612806318 # default: off - merge_nalus off; + merge_nalus off; # Whether enable the perf stat at http://localhost:1985/api/v1/perf # TODO: FIXME: We should enable it when refined. # default: off - perf_stat off; + perf_stat off; # For RTP packet and its payload cache. rtp_cache { # Whether enable the RTP packet cache. @@ -533,61 +534,67 @@ vhost rtc.vhost.srs.com { rtc { # Whether enable WebRTC server. # default: off - enabled on; - # The strategy for bframe. - # keep Keep bframe, which may make browser with playing problems. - # discard Discard bframe, maybe cause browser with little problems. - # default: keep - bframe discard; - # The strategy for aac audio. - # transcode Transcode aac to opus. - # discard Discard aac audio packet. - # default: transcode - aac transcode; + enabled on; + # Whether support NACK. + # default: on + nack on; + # Whether directly use the packet, avoid copy. + # default: on + nack_no_copy on; + # Whether support TWCC. + # default: on + twcc on; # The timeout in seconds for session timeout. # Client will send ping(STUN binding request) to server, we use it as heartbeat. # default: 30 - stun_timeout 30; - # The strick check when process stun. + stun_timeout 30; + # The strict check when process stun. # default: off stun_strict_check on; # The role of dtls when peer is actpass: passive or active # default: passive - dtls_role passive; + dtls_role passive; # The version of dtls, support dtls1.0, dtls1.2, and auto # default: auto dtls_version auto; # Drop the packet with the pt(payload type), 0 never drop. # default: 0 drop_for_pt 0; + ############################################################### + # For transmuxing RTMP to RTC, the strategy for bframe. + # keep Keep bframe, which may make browser with playing problems. + # discard Discard bframe, maybe cause browser with little problems. + # default: discard + bframe discard; + # For transmuxing RTMP to RTC, the strategy for aac audio. + # transcode Transcode aac to opus. + # discard Discard aac audio packet. + # default: transcode + aac transcode; + ############################################################### + # For transmuxing RTC to RTMP. + # Whether trans-mux RTC to RTMP streaming. + # Default: off + rtc_to_rtmp off; + # The PLI interval in seconds, for RTC to RTMP. + # Note the available range is [0.5, 30] + # Default: 6.0 + pli_for_rtmp 6.0; } - # whether enable min delay mode for vhost. + ############################################################### + # For transmuxing RTMP to RTC, it will impact the default values if RTC is on. + # Whether enable min delay mode for vhost. # default: on, for RTC. - min_latency on; + min_latency on; play { # set the MW(merged-write) latency in ms. # @remark For WebRTC, we enable pass-timestamp mode, so we ignore this config. # default: 0 (For WebRTC) - mw_latency 0; + mw_latency 0; # Set the MW(merged-write) min messages. - # default: 0 (For Real-Time, min_latency on) - # default: 1 (For WebRTC, min_latency off) - mw_msgs 0; - } - # For NACK. - nack { - # Whether support NACK. - # default: on - enabled on; - # Whether directly use the packet, avoid copy. - # default: on - no_copy on; - } - # For TWCC. - twcc { - # Whether support TWCC. - # default: on - enabled on; + # default: 0 (For Real-Time, that is min_latency on) + # default: 1 (For WebRTC, that is min_latency off) + mw_msgs 0; } } diff --git a/trunk/conf/rtc.conf b/trunk/conf/rtc.conf index b5a70eb07..0a3360c9f 100644 --- a/trunk/conf/rtc.conf +++ b/trunk/conf/rtc.conf @@ -18,21 +18,20 @@ stats { network 0; } rtc_server { - enabled on; + enabled on; # Listen at udp://8000 - listen 8000; + listen 8000; # # The $CANDIDATE means fetch from env, if not configed, use * as default. # # The * means retrieving server IP automatically, from all network interfaces, - # @see https://github.com/ossrs/srs/issues/307#issuecomment-599028124 - candidate $CANDIDATE; + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate + candidate $CANDIDATE; } vhost __defaultVhost__ { rtc { enabled on; - bframe discard; } http_remux { enabled on; diff --git a/trunk/conf/rtc2rtmp.conf b/trunk/conf/rtc2rtmp.conf new file mode 100644 index 000000000..3410cdaea --- /dev/null +++ b/trunk/conf/rtc2rtmp.conf @@ -0,0 +1,42 @@ + +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; + +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} + +http_api { + enabled on; + listen 1985; +} +stats { + network 0; +} +rtc_server { + enabled on; + # Listen at udp://8000 + listen 8000; + # + # The $CANDIDATE means fetch from env, if not configed, use * as default. + # + # The * means retrieving server IP automatically, from all network interfaces, + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate + candidate $CANDIDATE; +} + +vhost __defaultVhost__ { + rtc { + enabled on; + rtc_to_rtmp on; + } + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; + } +} + diff --git a/trunk/conf/rtc_live.conf b/trunk/conf/rtc_live.conf deleted file mode 100644 index 66682d09d..000000000 --- a/trunk/conf/rtc_live.conf +++ /dev/null @@ -1,47 +0,0 @@ - -listen 1935; -max_connections 1000; -daemon off; -srs_log_tank console; - -http_server { - enabled on; - listen 8080; -} - -http_api { - enabled on; - listen 1985; -} - -rtc_server { - enabled on; - listen 8000; - candidate $CANDIDATE; -} - -vhost __defaultVhost__ { - rtc { - enabled on; - bframe discard; - } - - http_remux { - enabled on; - mount [vhost]/[app]/[stream].flv; - } - - tcp_nodelay on - min_latency on; - - play { - gop_cache off; - queue_length 10; - mw_latency 100; - } - - publish { - mr off; - } -} - diff --git a/trunk/doc/source.flv b/trunk/doc/source.flv new file mode 120000 index 000000000..32d9ecfc2 --- /dev/null +++ b/trunk/doc/source.flv @@ -0,0 +1 @@ +source.200kbps.768x320.flv \ No newline at end of file diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7c79f2cc9..8c575a01e 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3789,8 +3789,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "play" && n != "publish" && n != "cluster" && n != "security" && n != "http_remux" && n != "dash" && n != "http_static" && n != "hds" && n != "exec" - && n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "nack" - && n != "twcc") { + && n != "in_ack_size" && n != "out_ack_size" && n != "rtc") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str()); } // for each sub directives of vhost. @@ -3941,7 +3940,8 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "enabled" && m != "bframe" && m != "aac" && m != "stun_timeout" && m != "stun_strict_check" - && m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt") { + && m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt" && m != "rtc_to_rtmp" + && m != "pli_for_rtmp") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -5104,7 +5104,7 @@ bool SrsConfig::get_rtc_enabled(string vhost) bool SrsConfig::get_rtc_bframe_discard(string vhost) { - static bool DEFAULT = false; + static bool DEFAULT = true; SrsConfDirective* conf = get_rtc(vhost); @@ -5117,7 +5117,7 @@ bool SrsConfig::get_rtc_bframe_discard(string vhost) return DEFAULT; } - return conf->arg0() == "discard"; + return conf->arg0() != "keep"; } bool SrsConfig::get_rtc_aac_discard(string vhost) @@ -5214,7 +5214,7 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost) { static int DEFAULT = 0; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } @@ -5227,11 +5227,51 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost) return ::atoi(conf->arg0().c_str()); } +bool SrsConfig::get_rtc_to_rtmp(string vhost) +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = get_rtc(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("rtc_to_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +srs_utime_t SrsConfig::get_rtc_pli_for_rtmp(string vhost) +{ + static srs_utime_t DEFAULT = 6 * SRS_UTIME_SECONDS; + + SrsConfDirective* conf = get_rtc(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("pli_for_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + srs_utime_t v = (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); + if (v < 500 * SRS_UTIME_MILLISECONDS || v > 30 * SRS_UTIME_SECONDS) { + srs_warn("Reset pli %dms to %dms", srsu2msi(v), srsu2msi(DEFAULT)); + return DEFAULT; + } + + return v; +} + bool SrsConfig::get_rtc_nack_enabled(string vhost) { static bool DEFAULT = true; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } @@ -5241,11 +5281,6 @@ bool SrsConfig::get_rtc_nack_enabled(string vhost) return DEFAULT; } - conf = conf->get("enabled"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - return SRS_CONF_PERFER_TRUE(conf->arg0()); } @@ -5253,17 +5288,12 @@ bool SrsConfig::get_rtc_nack_no_copy(string vhost) { static bool DEFAULT = true; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } - conf = conf->get("nack"); - if (!conf) { - return DEFAULT; - } - - conf = conf->get("no_copy"); + conf = conf->get("nack_no_copy"); if (!conf || conf->arg0().empty()) { return DEFAULT; } @@ -5275,7 +5305,7 @@ bool SrsConfig::get_rtc_twcc_enabled(string vhost) { static bool DEFAULT = true; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } @@ -5285,11 +5315,6 @@ bool SrsConfig::get_rtc_twcc_enabled(string vhost) return DEFAULT; } - conf = conf->get("enabled"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - return SRS_CONF_PERFER_TRUE(conf->arg0()); } diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index e0862a7c9..9e6d84880 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -562,6 +562,8 @@ public: std::string get_rtc_dtls_role(std::string vhost); std::string get_rtc_dtls_version(std::string vhost); int get_rtc_drop_for_pt(std::string vhost); + bool get_rtc_to_rtmp(std::string vhost); + srs_utime_t get_rtc_pli_for_rtmp(std::string vhost); bool get_rtc_nack_enabled(std::string vhost); bool get_rtc_nack_no_copy(std::string vhost); bool get_rtc_twcc_enabled(std::string vhost); diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 0e56a63c1..1ba6c1e5a 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -179,6 +179,7 @@ public: }; // HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format. +// TODO: FIXME: Rename to SrsHttpLive class SrsLiveStream : public ISrsHttpHandler { private: diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index c38609c9f..0037a6d9c 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -536,6 +536,10 @@ srs_error_t SrsUdpMuxListener::listen() srs_freep(trd); trd = new SrsSTCoroutine("udp", this, cid); + + //change stack size to 256K, fix crash when call some 3rd-part api. + ((SrsSTCoroutine*)trd)->set_stack_size(1 << 18); + if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start thread"); } diff --git a/trunk/src/app/srs_app_rtc_codec.cpp b/trunk/src/app/srs_app_rtc_codec.cpp index 2c9ebd2a9..c923deb5a 100644 --- a/trunk/src/app/srs_app_rtc_codec.cpp +++ b/trunk/src/app/srs_app_rtc_codec.cpp @@ -1,4 +1,3 @@ - /** * The MIT License (MIT) * @@ -22,14 +21,13 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#include -#include #include -static const int kFrameBufMax = 40960; -static const int kPacketBufMax = 8192; +#include +#include +#include -static const char* id2codec_name(SrsAudioCodecId id) +static const char* id2codec_name(SrsAudioCodecId id) { switch (id) { case SrsAudioCodecIdAAC: @@ -41,506 +39,379 @@ static const char* id2codec_name(SrsAudioCodecId id) } } -SrsAudioDecoder::SrsAudioDecoder(SrsAudioCodecId codec) - : codec_id_(codec) +SrsAudioTranscoder::SrsAudioTranscoder() { - frame_ = NULL; - packet_ = NULL; - codec_ctx_ = NULL; + dec_ = NULL; + dec_frame_ = NULL; + dec_packet_ = NULL; + enc_ = NULL; + enc_frame_ = NULL; + enc_packet_ = NULL; + swr_ = NULL; + swr_data_ = NULL; + fifo_ = NULL; + new_pkt_pts_ = AV_NOPTS_VALUE; + next_out_pts_ = AV_NOPTS_VALUE; } -SrsAudioDecoder::~SrsAudioDecoder() +SrsAudioTranscoder::~SrsAudioTranscoder() { - if (codec_ctx_) { - avcodec_free_context(&codec_ctx_); - codec_ctx_ = NULL; + if (dec_) { + avcodec_free_context(&dec_); } - if (frame_) { - av_frame_free(&frame_); - frame_ = NULL; + + if (dec_frame_) { + av_frame_free(&dec_frame_); } - if (packet_) { - av_packet_free(&packet_); - packet_ = NULL; + + if (dec_packet_) { + av_packet_free(&dec_packet_); + } + + if (swr_) { + swr_free(&swr_); + } + + free_swr_samples(); + + if (enc_) { + avcodec_free_context(&enc_); + } + + if (enc_frame_) { + av_frame_free(&enc_frame_); + } + + if (enc_packet_) { + av_packet_free(&enc_packet_); + } + + if (fifo_) { + av_audio_fifo_free(fifo_); + fifo_ = NULL; } } -srs_error_t SrsAudioDecoder::initialize() +srs_error_t SrsAudioTranscoder::initialize(SrsAudioCodecId src_codec, SrsAudioCodecId dst_codec, int dst_channels, int dst_samplerate, int dst_bit_rate) { srs_error_t err = srs_success; - //check codec name,only support "aac","opus" - if (codec_id_ != SrsAudioCodecIdAAC && codec_id_ != SrsAudioCodecIdOpus) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Invalid codec name %d", codec_id_); + if ((err = init_dec(src_codec)) != srs_success) { + return srs_error_wrap(err, "dec init codec:%d", src_codec); } - const char* codec_name = id2codec_name(codec_id_); + if ((err = init_enc(dst_codec, dst_channels, dst_samplerate, dst_bit_rate)) != srs_success) { + return srs_error_wrap(err, "enc init codec:%d, channels:%d, samplerate:%d, bitrate:%d", + dst_codec, dst_channels, dst_samplerate, dst_bit_rate); + } + + if ((err = init_fifo()) != srs_success) { + return srs_error_wrap(err, "fifo init"); + } + + return err; +} + +srs_error_t SrsAudioTranscoder::transcode(SrsAudioFrame *in_pkt, std::vector& out_pkts) +{ + srs_error_t err = srs_success; + + if ((err = decode_and_resample(in_pkt)) != srs_success) { + return srs_error_wrap(err, "decode and resample"); + } + + if ((err = encode(out_pkts)) != srs_success) { + return srs_error_wrap(err, "encode"); + } + + return err; +} + +void SrsAudioTranscoder::free_frames(std::vector& frames) +{ + for (std::vector::iterator it = frames.begin(); it != frames.end(); ++it) { + SrsAudioFrame* p = *it; + + for (int i = 0; i < p->nb_samples; i++) { + char* pa = p->samples[i].bytes; + srs_freepa(pa); + } + + srs_freep(p); + } +} + +void SrsAudioTranscoder::aac_codec_header(uint8_t **data, int *len) +{ + //srs_assert(dst_codec == SrsAudioCodecIdAAC); + *len = enc_->extradata_size; + *data = enc_->extradata; +} + +srs_error_t SrsAudioTranscoder::init_dec(SrsAudioCodecId src_codec) +{ + const char* codec_name = id2codec_name(src_codec); const AVCodec *codec = avcodec_find_decoder_by_name(codec_name); if (!codec) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name %d(%s)", codec_id_, codec_name); + return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name(%d,%s)", src_codec, codec_name); } - codec_ctx_ = avcodec_alloc_context3(codec); - if (!codec_ctx_) { + dec_ = avcodec_alloc_context3(codec); + if (!dec_) { return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context"); } - if (avcodec_open2(codec_ctx_, codec, NULL) < 0) { + if (avcodec_open2(dec_, codec, NULL) < 0) { return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not open codec"); } - frame_ = av_frame_alloc(); - if (!frame_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio frame"); + dec_frame_ = av_frame_alloc(); + if (!dec_frame_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio decode out frame"); } - packet_ = av_packet_alloc(); - if (!packet_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio packet"); + dec_packet_ = av_packet_alloc(); + if (!dec_packet_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio decode in packet"); } - return err; + new_pkt_pts_ = AV_NOPTS_VALUE; + return srs_success; } -srs_error_t SrsAudioDecoder::decode(SrsSample *pkt, char *buf, int &size) +srs_error_t SrsAudioTranscoder::init_enc(SrsAudioCodecId dst_codec, int dst_channels, int dst_samplerate, int dst_bit_rate) { - srs_error_t err = srs_success; - - packet_->data = (uint8_t *)pkt->bytes; - packet_->size = pkt->size; - - int ret = avcodec_send_packet(codec_ctx_, packet_); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error submitting the packet to the decoder"); - } - - int max = size; - size = 0; - - while (ret >= 0) { - ret = avcodec_receive_frame(codec_ctx_, frame_); - if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { - return err; - } else if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding"); - } - - int pcm_size = av_get_bytes_per_sample(codec_ctx_->sample_fmt); - if (pcm_size < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Failed to calculate data size"); - } - - // @see https://github.com/ossrs/srs/pull/2011/files - for (int i = 0; i < codec_ctx_->channels; i++) { - if (size + pcm_size * frame_->nb_samples <= max) { - memcpy(buf + size,frame_->data[i],pcm_size * frame_->nb_samples); - size += pcm_size * frame_->nb_samples; - } - } - } - - return err; -} - -AVCodecContext* SrsAudioDecoder::codec_ctx() -{ - return codec_ctx_; -} - -SrsAudioEncoder::SrsAudioEncoder(SrsAudioCodecId codec, int samplerate, int channels) - : channels_(channels), - sampling_rate_(samplerate), - codec_id_(codec), - want_bytes_(0) -{ - codec_ctx_ = NULL; -} - -SrsAudioEncoder::~SrsAudioEncoder() -{ - if (codec_ctx_) { - avcodec_free_context(&codec_ctx_); - } - - if (frame_) { - av_frame_free(&frame_); - } - -} - -srs_error_t SrsAudioEncoder::initialize() -{ - srs_error_t err = srs_success; - - if (codec_id_ != SrsAudioCodecIdAAC && codec_id_ != SrsAudioCodecIdOpus) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Invalid codec name %d", codec_id_); - } - - frame_ = av_frame_alloc(); - if (!frame_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio frame"); - } - - const char* codec_name = id2codec_name(codec_id_); + const char* codec_name = id2codec_name(dst_codec); const AVCodec *codec = avcodec_find_encoder_by_name(codec_name); if (!codec) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name %d(%s)", codec_id_, codec_name); + return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name(%d,%s)", dst_codec, codec_name); } - codec_ctx_ = avcodec_alloc_context3(codec); - if (!codec_ctx_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context"); + enc_ = avcodec_alloc_context3(codec); + if (!enc_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context(%d,%s)", dst_codec, codec_name); } - codec_ctx_->sample_rate = sampling_rate_; - codec_ctx_->channels = channels_; - codec_ctx_->channel_layout = av_get_default_channel_layout(channels_); - codec_ctx_->bit_rate = 48000; - if (codec_id_ == SrsAudioCodecIdOpus) { - codec_ctx_->sample_fmt = AV_SAMPLE_FMT_S16; + enc_->sample_rate = dst_samplerate; + enc_->channels = dst_channels; + enc_->channel_layout = av_get_default_channel_layout(dst_channels); + enc_->bit_rate = dst_bit_rate; + enc_->sample_fmt = codec->sample_fmts[0]; + enc_->time_base.num = 1; enc_->time_base.den = 1000; // {1, 1000} + if (dst_codec == SrsAudioCodecIdOpus) { //TODO: for more level setting - codec_ctx_->compression_level = 1; - } else if (codec_id_ == SrsAudioCodecIdAAC) { - codec_ctx_->sample_fmt = AV_SAMPLE_FMT_FLTP; + enc_->compression_level = 1; + } else if (dst_codec == SrsAudioCodecIdAAC) { + enc_->strict_std_compliance = FF_COMPLIANCE_EXPERIMENTAL; } // TODO: FIXME: Show detail error. - if (avcodec_open2(codec_ctx_, codec, NULL) < 0) { + if (avcodec_open2(enc_, codec, NULL) < 0) { return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not open codec"); } - want_bytes_ = codec_ctx_->channels * codec_ctx_->frame_size * av_get_bytes_per_sample(codec_ctx_->sample_fmt); + enc_frame_ = av_frame_alloc(); + if (!enc_frame_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio encode in frame"); + } - frame_->format = codec_ctx_->sample_fmt; - frame_->nb_samples = codec_ctx_->frame_size; - frame_->channel_layout = codec_ctx_->channel_layout; + enc_frame_->format = enc_->sample_fmt; + enc_frame_->nb_samples = enc_->frame_size; + enc_frame_->channel_layout = enc_->channel_layout; - if (av_frame_get_buffer(frame_, 0) < 0) { + if (av_frame_get_buffer(enc_frame_, 0) < 0) { return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not get audio frame buffer"); } - return err; + enc_packet_ = av_packet_alloc(); + if (!enc_packet_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio encode out packet"); + } + + next_out_pts_ = AV_NOPTS_VALUE; + return srs_success; } -int SrsAudioEncoder::want_bytes() +srs_error_t SrsAudioTranscoder::init_swr(AVCodecContext* decoder) { - return want_bytes_; + swr_ = swr_alloc_set_opts(NULL, enc_->channel_layout, enc_->sample_fmt, enc_->sample_rate, + decoder->channel_layout, decoder->sample_fmt, decoder->sample_rate, 0, NULL); + if (!swr_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc swr"); + } + + int error; + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; + if ((error = swr_init(swr_)) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "open swr(%d:%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } + + /* Allocate as many pointers as there are audio channels. + * Each pointer will later point to the audio samples of the corresponding + * channels (although it may be NULL for interleaved formats). + */ + if (!(swr_data_ = (uint8_t **)calloc(enc_->channels, sizeof(*swr_data_)))) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc swr buffer"); + } + + /* Allocate memory for the samples of all channels in one consecutive + * block for convenience. */ + if ((error = av_samples_alloc(swr_data_, NULL, enc_->channels, enc_->frame_size, enc_->sample_fmt, 0)) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc swr buffer(%d:%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } + + return srs_success; } -srs_error_t SrsAudioEncoder::encode(SrsSample *frame, char *buf, int &size) +srs_error_t SrsAudioTranscoder::init_fifo() +{ + if (!(fifo_ = av_audio_fifo_alloc(enc_->sample_fmt, enc_->channels, 1))) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate FIFO"); + } + return srs_success; +} + +srs_error_t SrsAudioTranscoder::decode_and_resample(SrsAudioFrame *pkt) { srs_error_t err = srs_success; - if (want_bytes_ > 0 && frame->size != want_bytes_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid frame size %d, should be %d", frame->size, want_bytes_); + dec_packet_->data = (uint8_t *)pkt->samples[0].bytes; + dec_packet_->size = pkt->samples[0].size; + + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; + + int error = avcodec_send_packet(dec_, dec_packet_); + if (error < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "submit to dec(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - // TODO: Directly use frame? - memcpy(frame_->data[0], frame->bytes, frame->size); - - /* send the frame for encoding */ - int r0 = avcodec_send_frame(codec_ctx_, frame_); - if (r0 < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error sending the frame to the encoder, %d", r0); - } - - AVPacket pkt; - av_init_packet(&pkt); - pkt.data = NULL; - pkt.size = 0; - - /* read all the available output packets (in general there may be any - * number of them */ - size = 0; - while (r0 >= 0) { - r0 = avcodec_receive_packet(codec_ctx_, &pkt); - if (r0 == AVERROR(EAGAIN) || r0 == AVERROR_EOF) { - break; - } else if (r0 < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding %d", r0); + new_pkt_pts_ = pkt->dts + pkt->cts; + while (error >= 0) { + error = avcodec_receive_frame(dec_, dec_frame_); + if (error == AVERROR(EAGAIN) || error == AVERROR_EOF) { + return err; + } else if (error < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - //TODO: fit encoder out more pkt - memcpy(buf, pkt.data, pkt.size); - size = pkt.size; - av_packet_unref(&pkt); + // Decoder is OK now, try to init swr if not initialized. + if (!swr_ && (err = init_swr(dec_)) != srs_success) { + return srs_error_wrap(err, "resample init"); + } - // TODO: FIXME: Refine api, got more than one packets. + int in_samples = dec_frame_->nb_samples; + const uint8_t **in_data = (const uint8_t**)dec_frame_->extended_data; + do { + /* Convert the samples using the resampler. */ + int frame_size = swr_convert(swr_, swr_data_, enc_->frame_size, in_data, in_samples); + if ((error = frame_size) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not convert input samples(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } + + in_data = NULL; in_samples = 0; + if ((err = add_samples_to_fifo(swr_data_, frame_size)) != srs_success) { + return srs_error_wrap(err, "write samples"); + } + } while (swr_get_out_samples(swr_, in_samples) >= enc_->frame_size); } return err; } -AVCodecContext* SrsAudioEncoder::codec_ctx() +srs_error_t SrsAudioTranscoder::encode(std::vector &pkts) { - return codec_ctx_; -} + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; -SrsAudioResample::SrsAudioResample(int src_rate, int src_layout, enum AVSampleFormat src_fmt, - int src_nb, int dst_rate, int dst_layout, AVSampleFormat dst_fmt) - : src_rate_(src_rate), - src_ch_layout_(src_layout), - src_sample_fmt_(src_fmt), - src_nb_samples_(src_nb), - dst_rate_(dst_rate), - dst_ch_layout_(dst_layout), - dst_sample_fmt_(dst_fmt) -{ - src_nb_channels_ = 0; - dst_nb_channels_ = 0; - src_linesize_ = 0; - dst_linesize_ = 0; - dst_nb_samples_ = 0; - src_data_ = NULL; - dst_data_ = 0; - - max_dst_nb_samples_ = 0; - swr_ctx_ = NULL; -} - -SrsAudioResample::~SrsAudioResample() -{ - if (src_data_) { - av_freep(&src_data_[0]); - av_freep(&src_data_); - src_data_ = NULL; - } - if (dst_data_) { - av_freep(&dst_data_[0]); - av_freep(&dst_data_); - dst_data_ = NULL; - } - if (swr_ctx_) { - swr_free(&swr_ctx_); - swr_ctx_ = NULL; - } -} - -srs_error_t SrsAudioResample::initialize() -{ - srs_error_t err = srs_success; - - swr_ctx_ = swr_alloc(); - if (!swr_ctx_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate resampler context"); - } - - av_opt_set_int(swr_ctx_, "in_channel_layout", src_ch_layout_, 0); - av_opt_set_int(swr_ctx_, "in_sample_rate", src_rate_, 0); - av_opt_set_sample_fmt(swr_ctx_, "in_sample_fmt", src_sample_fmt_, 0); - - av_opt_set_int(swr_ctx_, "out_channel_layout", dst_ch_layout_, 0); - av_opt_set_int(swr_ctx_, "out_sample_rate", dst_rate_, 0); - av_opt_set_sample_fmt(swr_ctx_, "out_sample_fmt", dst_sample_fmt_, 0); - - int ret; - if ((ret = swr_init(swr_ctx_)) < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Failed to initialize the resampling context"); - } - - src_nb_channels_ = av_get_channel_layout_nb_channels(src_ch_layout_); - ret = av_samples_alloc_array_and_samples(&src_data_, &src_linesize_, src_nb_channels_, - src_nb_samples_, src_sample_fmt_, 0); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate source samples"); - } - - max_dst_nb_samples_ = dst_nb_samples_ = - av_rescale_rnd(src_nb_samples_, dst_rate_, src_rate_, AV_ROUND_UP); - - dst_nb_channels_ = av_get_channel_layout_nb_channels(dst_ch_layout_); - ret = av_samples_alloc_array_and_samples(&dst_data_, &dst_linesize_, dst_nb_channels_, - dst_nb_samples_, dst_sample_fmt_, 0); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate destination samples"); - } - - return err; -} - -srs_error_t SrsAudioResample::resample(SrsSample *pcm, char *buf, int &size) -{ - srs_error_t err = srs_success; - - int ret, plane = 1; - if (src_sample_fmt_ == AV_SAMPLE_FMT_FLTP) { - plane = 2; - } - if (src_linesize_ * plane < pcm->size || pcm->size < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "size not ok"); - } - memcpy(src_data_[0], pcm->bytes, pcm->size); - - dst_nb_samples_ = av_rescale_rnd(swr_get_delay(swr_ctx_, src_rate_) + - src_nb_samples_, dst_rate_, src_rate_, AV_ROUND_UP); - if (dst_nb_samples_ > max_dst_nb_samples_) { - av_freep(&dst_data_[0]); - ret = av_samples_alloc(dst_data_, &dst_linesize_, dst_nb_channels_, - dst_nb_samples_, dst_sample_fmt_, 1); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc error"); - } - max_dst_nb_samples_ = dst_nb_samples_; - } - - ret = swr_convert(swr_ctx_, dst_data_, dst_nb_samples_, (const uint8_t **)src_data_, src_nb_samples_); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error while converting"); - } - - int dst_bufsize = av_samples_get_buffer_size(&dst_linesize_, dst_nb_channels_, - ret, dst_sample_fmt_, 1); - if (dst_bufsize < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not get sample buffer size"); - } - - int max = size; - size = 0; - if (max >= dst_bufsize) { - memcpy(buf, dst_data_[0], dst_bufsize); - size = dst_bufsize; - } - - return err; -} - -SrsAudioRecode::SrsAudioRecode(SrsAudioCodecId src_codec, SrsAudioCodecId dst_codec,int channels, int samplerate) - : dst_channels_(channels), - dst_samplerate_(samplerate), - src_codec_(src_codec), - dst_codec_(dst_codec) -{ - size_ = 0; - data_ = NULL; - - dec_ = NULL; - enc_ = NULL; - resample_ = NULL; -} - -SrsAudioRecode::~SrsAudioRecode() -{ - srs_freep(dec_); - srs_freep(enc_); - srs_freep(resample_); - srs_freepa(data_); -} - -srs_error_t SrsAudioRecode::initialize() -{ - srs_error_t err = srs_success; - - dec_ = new SrsAudioDecoder(src_codec_); - if ((err = dec_->initialize()) != srs_success) { - return srs_error_wrap(err, "dec init"); - } - - enc_ = new SrsAudioEncoder(dst_codec_, dst_samplerate_, dst_channels_); - if ((err = enc_->initialize()) != srs_success) { - return srs_error_wrap(err, "enc init"); - } - - enc_want_bytes_ = enc_->want_bytes(); - if (enc_want_bytes_ > 0) { - data_ = new char[enc_want_bytes_]; - srs_assert(data_); - } - - return err; -} - -srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, int &n) -{ - srs_error_t err = srs_success; - - if (!dec_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "dec_ nullptr"); - } - - int decode_len = kPacketBufMax; - static char decode_buffer[kPacketBufMax]; - if ((err = dec_->decode(pkt, decode_buffer, decode_len)) != srs_success) { - return srs_error_wrap(err, "decode error"); - } - - if (!resample_) { - int channel_layout = av_get_default_channel_layout(dst_channels_); - AVCodecContext *codec_ctx = dec_->codec_ctx(); - resample_ = new SrsAudioResample(codec_ctx->sample_rate, (int)codec_ctx->channel_layout, \ - codec_ctx->sample_fmt, codec_ctx->frame_size, dst_samplerate_, channel_layout, \ - enc_->codec_ctx()->sample_fmt); - if ((err = resample_->initialize()) != srs_success) { - return srs_error_wrap(err, "init resample"); + if (next_out_pts_ == AV_NOPTS_VALUE) { + next_out_pts_ = new_pkt_pts_; + } else { + int64_t diff = llabs(new_pkt_pts_ - next_out_pts_); + if (diff > 1000) { + srs_trace("time diff to large=%lld, next out=%lld, new pkt=%lld, set to new pkt", + diff, next_out_pts_, new_pkt_pts_); + next_out_pts_ = new_pkt_pts_; } } - SrsSample pcm; - pcm.bytes = decode_buffer; - pcm.size = decode_len; - int resample_len = kFrameBufMax; - static char resample_buffer[kFrameBufMax]; - static char encode_buffer[kPacketBufMax]; - if ((err = resample_->resample(&pcm, resample_buffer, resample_len)) != srs_success) { - return srs_error_wrap(err, "resample error"); - } - - n = 0; - - // We can encode it in one time. - if (enc_want_bytes_ <= 0) { - int encode_len; - pcm.bytes = (char *)data_; - pcm.size = size_; - if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) { - return srs_error_wrap(err, "encode error"); + int frame_cnt = 0; + while (av_audio_fifo_size(fifo_) >= enc_->frame_size) { + /* Read as many samples from the FIFO buffer as required to fill the frame. + * The samples are stored in the frame temporarily. */ + if (av_audio_fifo_read(fifo_, (void **)enc_frame_->data, enc_->frame_size) < enc_->frame_size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not read data from FIFO"); + } + /* send the frame for encoding */ + enc_frame_->pts = next_out_pts_ + av_rescale(enc_->frame_size * frame_cnt, 1000, enc_->sample_rate); + ++frame_cnt; + int error = avcodec_send_frame(enc_, enc_frame_); + if (error < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Error sending the frame to the encoder(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - memcpy(buf[n], encode_buffer, encode_len); - buf_len[n] = encode_len; - n++; + av_init_packet(enc_packet_); + enc_packet_->data = NULL; + enc_packet_->size = 0; + /* read all the available output packets (in general there may be any + * number of them */ + while (error >= 0) { + error = avcodec_receive_packet(enc_, enc_packet_); + if (error == AVERROR(EAGAIN) || error == AVERROR_EOF) { + break; + } else if (error < 0) { + free_frames(pkts); + return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } - return err; - } - - // Need to refill the sample to data, because the frame size is not matched to encoder. - int data_left = resample_len; - if (size_ + data_left < enc_want_bytes_) { - memcpy(data_ + size_, resample_buffer, data_left); - size_ += data_left; - return err; - } - - int index = 0; - while (1) { - data_left = data_left - (enc_want_bytes_ - size_); - memcpy(data_ + size_, resample_buffer + index, enc_want_bytes_ - size_); - index += enc_want_bytes_ - size_; - size_ += enc_want_bytes_ - size_; - - int encode_len; - pcm.bytes = (char *)data_; - pcm.size = size_; - if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) { - return srs_error_wrap(err, "encode error"); - } - - if (encode_len > 0) { - memcpy(buf[n], encode_buffer, encode_len); - buf_len[n] = encode_len; - n++; - } - - size_ = 0; - if(!data_left) { - break; - } - - if(data_left < enc_want_bytes_) { - memcpy(data_ + size_, resample_buffer + index, data_left); - size_ += data_left; - break; + SrsAudioFrame *out_frame = new SrsAudioFrame; + char *buf = new char[enc_packet_->size]; + memcpy(buf, enc_packet_->data, enc_packet_->size); + out_frame->add_sample(buf, enc_packet_->size); + out_frame->dts = enc_packet_->dts; + out_frame->cts = enc_packet_->pts - enc_packet_->dts; + pkts.push_back(out_frame); } } - return err; + next_out_pts_ += av_rescale(enc_->frame_size * frame_cnt, 1000, enc_->sample_rate); + + return srs_success; } + +srs_error_t SrsAudioTranscoder::add_samples_to_fifo(uint8_t **samples, int frame_size) +{ + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; + + int error; + + /* Make the FIFO as large as it needs to be to hold both, + * the old and the new samples. */ + if ((error = av_audio_fifo_realloc(fifo_, av_audio_fifo_size(fifo_) + frame_size)) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not reallocate FIFO(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } + + /* Store the new samples in the FIFO buffer. */ + if ((error = av_audio_fifo_write(fifo_, (void **)samples, frame_size)) < frame_size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not write data to FIFO(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } + + return srs_success; +} + +void SrsAudioTranscoder::free_swr_samples() +{ + if (swr_data_) { + av_freep(&swr_data_[0]); + free(swr_data_); + swr_data_ = NULL; + } +} + diff --git a/trunk/src/app/srs_app_rtc_codec.hpp b/trunk/src/app/srs_app_rtc_codec.hpp index b9da7daf5..e38f184a5 100644 --- a/trunk/src/app/srs_app_rtc_codec.hpp +++ b/trunk/src/app/srs_app_rtc_codec.hpp @@ -26,6 +26,8 @@ #include +#include + #include #ifdef __cplusplus @@ -39,98 +41,59 @@ extern "C" { #include #include #include +#include #ifdef __cplusplus } #endif -class SrsSample; - -class SrsAudioDecoder +class SrsAudioTranscoder { private: - AVFrame* frame_; - AVPacket* packet_; - AVCodecContext* codec_ctx_; - SrsAudioCodecId codec_id_; -public: - //Only support "aac","opus" - SrsAudioDecoder(SrsAudioCodecId codec); - virtual ~SrsAudioDecoder(); - srs_error_t initialize(); - virtual srs_error_t decode(SrsSample *pkt, char *buf, int &size); - AVCodecContext* codec_ctx(); -}; + AVCodecContext *dec_; + AVFrame *dec_frame_; + AVPacket *dec_packet_; -class SrsAudioEncoder -{ + AVCodecContext *enc_; + AVFrame *enc_frame_; + AVPacket *enc_packet_; + + SwrContext *swr_; + //buffer for swr out put + uint8_t **swr_data_; + AVAudioFifo *fifo_; + + int64_t new_pkt_pts_; + int64_t next_out_pts_; +public: + SrsAudioTranscoder(); + virtual ~SrsAudioTranscoder(); +public: + // Initialize the transcoder, transcode from codec as to codec. + // The channels specifies the number of output channels for encoder, for example, 2. + // The sample_rate specifies the sample rate of encoder, for example, 48000. + // The bit_rate specifies the bitrate of encoder, for example, 48000. + srs_error_t initialize(SrsAudioCodecId from, SrsAudioCodecId to, int channels, int sample_rate, int bit_rate); + // Transcode the input audio frame in, as output audio frames outs. + virtual srs_error_t transcode(SrsAudioFrame* in, std::vector& outs); + // Free the generated audio frames by transcode. + void free_frames(std::vector& frames); +public: + // Get the aac codec header, for example, FLV sequence header. + // @remark User should never free the data, it's managed by this transcoder. + void aac_codec_header(uint8_t** data, int* len); private: - int channels_; - int sampling_rate_; - AVCodecContext* codec_ctx_; - SrsAudioCodecId codec_id_; - int want_bytes_; - AVFrame* frame_; -public: - //Only support "aac","opus" - SrsAudioEncoder(SrsAudioCodecId codec, int samplerate, int channelsy); - virtual ~SrsAudioEncoder(); - srs_error_t initialize(); - //The encoder wanted bytes to call encode, if > 0, caller must feed the same bytes - //Call after initialize successed - int want_bytes(); - virtual srs_error_t encode(SrsSample *frame, char *buf, int &size); - AVCodecContext* codec_ctx(); -}; + srs_error_t init_dec(SrsAudioCodecId from); + srs_error_t init_enc(SrsAudioCodecId to, int channels, int samplerate, int bit_rate); + srs_error_t init_swr(AVCodecContext* decoder); + srs_error_t init_fifo(); -class SrsAudioResample -{ -private: - int src_rate_; - int src_ch_layout_; - int src_nb_channels_; - enum AVSampleFormat src_sample_fmt_; - int src_linesize_; - int src_nb_samples_; - uint8_t **src_data_; + srs_error_t decode_and_resample(SrsAudioFrame* pkt); + srs_error_t encode(std::vector &pkts); - int dst_rate_; - int dst_ch_layout_; - int dst_nb_channels_; - enum AVSampleFormat dst_sample_fmt_; - int dst_linesize_; - int dst_nb_samples_; - uint8_t **dst_data_; - - int max_dst_nb_samples_; - struct SwrContext *swr_ctx_; -public: - SrsAudioResample(int src_rate, int src_layout, enum AVSampleFormat src_fmt, - int src_nb, int dst_rate, int dst_layout, enum AVSampleFormat dst_fmt); - virtual ~SrsAudioResample(); - srs_error_t initialize(); - virtual srs_error_t resample(SrsSample *pcm, char *buf, int &size); -}; - -// TODO: FIXME: Rename to Transcoder. -class SrsAudioRecode -{ -private: - SrsAudioDecoder *dec_; - SrsAudioEncoder *enc_; - SrsAudioResample *resample_; - int dst_channels_; - int dst_samplerate_; - int size_; - char *data_; - SrsAudioCodecId src_codec_; - SrsAudioCodecId dst_codec_; - int enc_want_bytes_; -public: - SrsAudioRecode(SrsAudioCodecId src_codec, SrsAudioCodecId dst_codec,int channels, int samplerate); - virtual ~SrsAudioRecode(); - srs_error_t initialize(); - virtual srs_error_t transcode(SrsSample *pkt, char **buf, int *buf_len, int &n); + srs_error_t add_samples_to_fifo(uint8_t** samples, int frame_size); + void free_swr_samples(); }; #endif /* SRS_APP_AUDIO_RECODE_HPP */ + diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 2716ddd44..4523c0dde 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -998,6 +998,30 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti } source->set_publish_stream(this); + // Bridge to rtmp +#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) + bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost); + if (rtc_to_rtmp) { + SrsSource *rtmp = NULL; + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? + if (!rtmp->can_publish(false)) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); + } + + SrsRtmpFromRtcBridger *bridger = new SrsRtmpFromRtcBridger(rtmp); + if ((err = bridger->initialize(r)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "create bridger"); + } + + source->set_bridger(bridger); + } +#endif + return err; } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index a1a3d699d..344d42168 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -70,11 +70,6 @@ const int kAudioSamplerate = 48000; const int kVideoPayloadType = 102; const int kVideoSamplerate = 90000; -// An AAC packet may be transcoded to many OPUS packets. -const int kMaxOpusPackets = 8; -// The max size for each OPUS packet. -const int kMaxOpusPacketSize = 4096; - // The RTP payload max size, reserved some paddings for SRTP as such: // kRtpPacketSize = kRtpMaxPayloadSize + paddings // For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, @@ -333,6 +328,14 @@ ISrsRtcStreamEventHandler::~ISrsRtcStreamEventHandler() { } +ISrsRtcSourceBridger::ISrsRtcSourceBridger() +{ +} + +ISrsRtcSourceBridger::~ISrsRtcSourceBridger() +{ +} + SrsRtcStream::SrsRtcStream() { is_created_ = false; @@ -342,7 +345,7 @@ SrsRtcStream::SrsRtcStream() stream_desc_ = NULL; req = NULL; - bridger_ = new SrsRtcDummyBridger(this); + bridger_ = NULL; } SrsRtcStream::~SrsRtcStream() @@ -413,9 +416,10 @@ SrsContextId SrsRtcStream::pre_source_id() return _pre_source_id; } -ISrsSourceBridger* SrsRtcStream::bridger() +void SrsRtcStream::set_bridger(ISrsRtcSourceBridger *bridger) { - return bridger_; + srs_freep(bridger_); + bridger_ = bridger; } srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) @@ -482,21 +486,22 @@ srs_error_t SrsRtcStream::on_publish() is_created_ = true; is_delivering_packets_ = true; - // Create a new bridger, because it's been disposed when unpublish. -#ifdef SRS_FFMPEG_FIT - SrsRtcFromRtmpBridger* impl = new SrsRtcFromRtmpBridger(this); - if ((err = impl->initialize(req)) != srs_success) { - return srs_error_wrap(err, "bridge initialize"); - } - - bridger_->setup(impl); -#endif - // Notify the consumers about stream change event. if ((err = on_source_changed()) != srs_success) { return srs_error_wrap(err, "source id change"); } + // If bridge to other source, handle event and start timer to request PLI. + if (bridger_) { + if ((err = bridger_->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridger on publish"); + } + + // For SrsRtcStream::on_timer() + srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost); + _srs_hybrid->timer()->subscribe(pli_for_rtmp, this); + } + // TODO: FIXME: Handle by statistic. return err; @@ -524,14 +529,18 @@ void SrsRtcStream::on_unpublish() h->on_unpublish(); } + //free bridger resource + if (bridger_) { + // For SrsRtcStream::on_timer() + _srs_hybrid->timer()->unsubscribe(this); + + bridger_->on_unpublish(); + srs_freep(bridger_); + } + // release unpublish stream description. set_stream_desc(NULL); - // Dispose the impl of bridger, to free memory. -#ifdef SRS_FFMPEG_FIT - bridger_->setup(NULL); -#endif - // TODO: FIXME: Handle by statistic. } @@ -572,6 +581,10 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) } } + if (bridger_ && (err = bridger_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "bridger consume message"); + } + return err; } @@ -613,6 +626,22 @@ std::vector SrsRtcStream::get_track_desc(std::string ty return track_descs; } +srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + if (!publish_stream_) { + return err; + } + + for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) { + SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i); + publish_stream_->request_keyframe(desc->ssrc_); + } + + return err; +} + SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper() { pkt = _srs_rtp_cache->allocate(); @@ -632,12 +661,11 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) req = NULL; source_ = source; format = new SrsRtmpFormat(); - codec = new SrsAudioRecode(SrsAudioCodecIdAAC, SrsAudioCodecIdOpus, kAudioChannel, kAudioSamplerate); + codec_ = new SrsAudioTranscoder(); discard_aac = false; discard_bframe = false; merge_nalus = false; meta = new SrsMetaCache(); - audio_timestamp = 0; audio_sequence = 0; video_sequence = 0; @@ -687,7 +715,7 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) SrsRtcFromRtmpBridger::~SrsRtcFromRtmpBridger() { srs_freep(format); - srs_freep(codec); + srs_freep(codec_); srs_freep(meta); } @@ -701,7 +729,8 @@ srs_error_t SrsRtcFromRtmpBridger::initialize(SrsRequest* r) return srs_error_wrap(err, "format initialize"); } - if ((err = codec->initialize()) != srs_success) { + int bitrate = 48000; // The output bitrate in bps. + if ((err = codec_->initialize(SrsAudioCodecIdAAC, SrsAudioCodecIdOpus, kAudioChannel, kAudioSamplerate, bitrate)) != srs_success) { return srs_error_wrap(err, "init codec"); } @@ -779,72 +808,58 @@ srs_error_t SrsRtcFromRtmpBridger::on_audio(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "aac append header"); } - if (adts_audio) { - err = transcode(adts_audio, nn_adts_audio); - srs_freep(adts_audio); + if (!adts_audio) { + return err; } + SrsAudioFrame aac; + aac.dts = format->audio->dts; + aac.cts = format->audio->cts; + if ((err = aac.add_sample(adts_audio, nn_adts_audio)) == srs_success) { + // If OK, transcode the AAC to Opus and consume it. + err = transcode(&aac); + } + + srs_freepa(adts_audio); + return err; } -srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio) +srs_error_t SrsRtcFromRtmpBridger::transcode(SrsAudioFrame* pkt) { srs_error_t err = srs_success; - // Opus packet cache. - static char* opus_payloads[kMaxOpusPackets]; - - static bool initialized = false; - if (!initialized) { - initialized = true; - - static char opus_packets_cache[kMaxOpusPackets][kMaxOpusPacketSize]; - opus_payloads[0] = &opus_packets_cache[0][0]; - for (int i = 1; i < kMaxOpusPackets; i++) { - opus_payloads[i] = opus_packets_cache[i]; - } - } - - // Transcode an aac packet to many opus packets. - SrsSample aac; - aac.bytes = adts_audio; - aac.size = nn_adts_audio; - - int nn_opus_packets = 0; - int opus_sizes[kMaxOpusPackets]; - if ((err = codec->transcode(&aac, opus_payloads, opus_sizes, nn_opus_packets)) != srs_success) { + std::vector out_pkts; + if ((err = codec_->transcode(pkt, out_pkts)) != srs_success) { return srs_error_wrap(err, "recode error"); } // Save OPUS packets in shared message. - if (nn_opus_packets <= 0) { + if (out_pkts.empty()) { return err; } - int nn_max_extra_payload = 0; - for (int i = 0; i < nn_opus_packets; i++) { - char* data = (char*)opus_payloads[i]; - int size = (int)opus_sizes[i]; - - // TODO: FIXME: Use it to padding audios. - nn_max_extra_payload = srs_max(nn_max_extra_payload, size); - + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); SrsAutoFree(SrsRtpPacketCacheHelper, helper); - if ((err = package_opus(data, size, helper)) != srs_success) { - return srs_error_wrap(err, "package opus"); + if ((err = package_opus(*it, helper)) != srs_success) { + err = srs_error_wrap(err, "package opus"); + break; } if ((err = source_->on_rtp(helper->pkt)) != srs_success) { - return srs_error_wrap(err, "consume opus"); + err = srs_error_wrap(err, "consume opus"); + break; } } + codec_->free_frames(out_pkts); + return err; } -srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper) +srs_error_t SrsRtcFromRtmpBridger::package_opus(SrsAudioFrame* audio, SrsRtpPacketCacheHelper* helper) { srs_error_t err = srs_success; @@ -854,16 +869,14 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack pkt->frame_type = SrsFrameTypeAudio; pkt->header.set_marker(true); pkt->header.set_sequence(audio_sequence++); - pkt->header.set_timestamp(audio_timestamp); - - // TODO: FIXME: Why 960? Need Refactoring? - audio_timestamp += 960; + pkt->header.set_timestamp(audio->dts * 48); SrsRtpRawPayload* raw = _srs_rtp_raw_cache->allocate(); pkt->set_payload(raw, SrsRtpPacketPayloadTypeRaw); - raw->payload = pkt->wrap(data, size); - raw->nn_payload = size; + srs_assert(audio->nb_samples == 1); + raw->payload = pkt->wrap(audio->samples[0].bytes, audio->samples[0].size); + raw->nn_payload = audio->samples[0].size; return err; } @@ -1215,58 +1228,475 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vectorinitialize(from, to, channels, sample_rate, bitrate)) != srs_success) { + return srs_error_wrap(err, "bridge initialize"); + } + + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::on_publish() +{ + srs_error_t err = srs_success; + + is_first_audio = true; + is_first_video = true; + + // TODO: FIXME: Should sync with bridger? + if ((err = source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::on_rtp(SrsRtpPacket2 *pkt) +{ + srs_error_t err = srs_success; + + if (!pkt->payload()) { + return err; + } + + if (pkt->is_audio()) { + err = trancode_audio(pkt); + } else { + err = packet_video(pkt); + } + + return err; +} + +void SrsRtmpFromRtcBridger::on_unpublish() +{ + // TODO: FIXME: Should sync with bridger? + source_->on_unpublish(); +} + +srs_error_t SrsRtmpFromRtcBridger::trancode_audio(SrsRtpPacket2 *pkt) +{ + srs_error_t err = srs_success; + + // to common message. + uint32_t ts = pkt->header.get_timestamp()/(48000/1000); + if (is_first_audio) { + int header_len = 0; + uint8_t* header = NULL; + codec_->aac_codec_header(&header, &header_len); + + SrsCommonMessage out_rtmp; + packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio); + + if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "source on audio"); + } + + is_first_audio = false; + } + + std::vector out_pkts; + SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); + + SrsAudioFrame frame; + frame.add_sample(payload->payload, payload->nn_payload); + frame.dts = ts; + frame.cts = 0; + + err = codec_->transcode(&frame, out_pkts); + if (err != srs_success) { + return err; + } + + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { + SrsCommonMessage out_rtmp; + out_rtmp.header.timestamp = (*it)->dts*(48000/1000); + packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio); + + if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + err = srs_error_wrap(err, "source on audio"); + break; + } + } + codec_->free_frames(out_pkts); + + return err; +} + +void SrsRtmpFromRtcBridger::packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header) +{ + int rtmp_len = len + 2; + audio->header.initialize_audio(rtmp_len, pts, 1); + audio->create_payload(rtmp_len); + SrsBuffer stream(audio->payload, rtmp_len); + uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo; + stream.write_1bytes(aac_flag); + if (is_header) { + stream.write_1bytes(0); + } else { + stream.write_1bytes(1); + } + stream.write_bytes(data, len); + audio->size = rtmp_len; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video(SrsRtpPacket2* src) +{ + srs_error_t err = srs_success; + + // TODO: Only copy when need + SrsRtpPacket2* pkt = src->copy(); + + if (pkt->is_keyframe()) { + return packet_video_key_frame(pkt); + } + + // store in cache + int index = cache_index(pkt->header.get_sequence()); + cache_video_pkts_[index].in_use = true; + cache_video_pkts_[index].pkt = pkt; + cache_video_pkts_[index].sn = pkt->header.get_sequence(); + cache_video_pkts_[index].ts = pkt->header.get_timestamp(); + + // check whether to recovery lost packet and can construct a video frame + if (lost_sn_ == pkt->header.get_sequence()) { + uint16_t tail_sn = 0; + int sn = find_next_lost_sn(lost_sn_, tail_sn); + if (-1 == sn ) { + if (check_frame_complete(header_sn_, tail_sn)) { + if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) { + err = srs_error_wrap(err, "fail to pack video frame"); + } + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = (uint16_t)sn; + } + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + // TODO: handle sps and pps in 2 rtp packets + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + SrsSample* sps = stap_payload->get_sps(); + SrsSample* pps = stap_payload->get_pps(); + if (NULL == sps || NULL == pps) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "no sps or pps in stap-a rtp. sps: %p, pps:%p", sps, pps); + } else { + //type_codec1 + avc_type + composition time + fix header + count of sps + len of sps + sps + count of pps + len of pps + pps + int nb_payload = 1 + 1 + 3 + 5 + 1 + 2 + sps->size + 1 + 2 + pps->size; + SrsCommonMessage rtmp; + rtmp.header.initialize_video(nb_payload, pkt->header.get_timestamp() / 90, 1); + rtmp.create_payload(nb_payload); + rtmp.size = nb_payload; + SrsBuffer payload(rtmp.payload, rtmp.size); + //TODO: call api + payload.write_1bytes(0x17);// type(4 bits): key frame; code(4bits): avc + payload.write_1bytes(0x0); // avc_type: sequence header + payload.write_1bytes(0x0); // composition time + payload.write_1bytes(0x0); + payload.write_1bytes(0x0); + payload.write_1bytes(0x01); // version + payload.write_1bytes(sps->bytes[1]); + payload.write_1bytes(sps->bytes[2]); + payload.write_1bytes(sps->bytes[3]); + payload.write_1bytes(0xff); + payload.write_1bytes(0xe1); + payload.write_2bytes(sps->size); + payload.write_bytes(sps->bytes, sps->size); + payload.write_1bytes(0x01); + payload.write_2bytes(pps->size); + payload.write_bytes(pps->bytes, pps->size); + if ((err = source_->on_video(&rtmp)) != srs_success) { + return err; + } + } + } + + if (-1 == key_frame_ts_) { + key_frame_ts_ = pkt->header.get_timestamp(); + header_sn_ = pkt->header.get_sequence(); + lost_sn_ = header_sn_ + 1; + // Received key frame and clean cache of old p frame pkts + clear_cached_video(); + srs_trace("set ts=%lld, header=%hu, lost=%hu", key_frame_ts_, header_sn_, lost_sn_); + } else if (key_frame_ts_ != pkt->header.get_timestamp()) { + //new key frame, clean cache + int64_t old_ts = key_frame_ts_; + uint16_t old_header_sn = header_sn_; + uint16_t old_lost_sn = lost_sn_; + key_frame_ts_ = pkt->header.get_timestamp(); + header_sn_ = pkt->header.get_sequence(); + lost_sn_ = header_sn_ + 1; + clear_cached_video(); + srs_trace("drop old ts=%lld, header=%hu, lost=%hu, set new ts=%lld, header=%hu, lost=%hu", + old_ts, old_header_sn, old_lost_sn, key_frame_ts_, header_sn_, lost_sn_); + } + + uint16_t index = cache_index(pkt->header.get_sequence()); + cache_video_pkts_[index].in_use = true; + cache_video_pkts_[index].pkt = pkt; + cache_video_pkts_[index].sn = pkt->header.get_sequence(); + cache_video_pkts_[index].ts = pkt->header.get_timestamp(); + + int32_t sn = lost_sn_; + uint16_t tail_sn = 0; + if (srs_rtp_seq_distance(header_sn_, pkt->header.get_sequence()) < 0){ + // When receive previous pkt in the same frame, update header sn; + header_sn_ = pkt->header.get_sequence(); + sn = find_next_lost_sn(header_sn_, tail_sn); + } else if (lost_sn_ == pkt->header.get_sequence()) { + sn = find_next_lost_sn(lost_sn_, tail_sn); + } + + if (-1 == sn) { + if (check_frame_complete(header_sn_, tail_sn)) { + if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) { + err = srs_error_wrap(err, "fail to packet key frame"); + } + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = (uint16_t)sn; + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const uint16_t end) +{ + srs_error_t err = srs_success; + + //type_codec1 + avc_type + composition time + nalu size + nalu + int nb_payload = 1 + 1 + 3; + uint16_t cnt = end - start + 1; + + for (uint16_t i = 0; i < cnt; ++i) { + uint16_t sn = start + i; + uint16_t index = cache_index(sn); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + // calculate nalu len + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + nb_payload += 1 + 4; + } + nb_payload += fua_payload->size; + continue; + } + + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + for (int j = 0; j < stap_payload->nalus.size(); ++j) { + SrsSample* sample = stap_payload->nalus.at(j); + nb_payload += 4 + sample->size; + } + continue; + } + + SrsRtpRawPayload* raw_payload = dynamic_cast(pkt->payload()); + if (raw_payload) { + nb_payload += 4 + raw_payload->nn_payload; + continue; + } + } + + SrsCommonMessage rtmp; + SrsRtpPacket2* header = cache_video_pkts_[cache_index(start)].pkt; + rtmp.header.initialize_video(nb_payload, header->header.get_timestamp() / 90, 1); + rtmp.create_payload(nb_payload); + rtmp.size = nb_payload; + SrsBuffer payload(rtmp.payload, rtmp.size); + if (header->is_keyframe()) { + payload.write_1bytes(0x17); // type(4 bits): key frame; code(4bits): avc + key_frame_ts_ = -1; + } else { + payload.write_1bytes(0x27); // type(4 bits): inter frame; code(4bits): avc + } + payload.write_1bytes(0x01); // avc_type: nalu + payload.write_1bytes(0x0); // composition time + payload.write_1bytes(0x0); + payload.write_1bytes(0x0); + + int nalu_len = 0; + for (uint16_t i = 0; i < cnt; ++i) { + uint16_t index = cache_index((start + i)); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + cache_video_pkts_[index].in_use = false; + cache_video_pkts_[index].pkt = NULL; + cache_video_pkts_[index].ts = 0; + cache_video_pkts_[index].sn = 0; + + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + nalu_len = fua_payload->size + 1; + //skip 4 bytes to write nalu_len future + payload.skip(4); + payload.write_1bytes(fua_payload->nri | fua_payload->nalu_type); + payload.write_bytes(fua_payload->payload, fua_payload->size); + } else { + nalu_len += fua_payload->size; + payload.write_bytes(fua_payload->payload, fua_payload->size); + if (fua_payload->end) { + //write nalu_len back + payload.skip(-(4 + nalu_len)); + payload.write_4bytes(nalu_len); + payload.skip(nalu_len); + } + } + srs_freep(pkt); + continue; + } + + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + for (int j = 0; j < stap_payload->nalus.size(); ++j) { + SrsSample* sample = stap_payload->nalus.at(j); + payload.write_4bytes(sample->size); + payload.write_bytes(sample->bytes, sample->size); + } + srs_freep(pkt); + continue; + } + + SrsRtpRawPayload* raw_payload = dynamic_cast(pkt->payload()); + if (raw_payload) { + payload.write_4bytes(raw_payload->nn_payload); + payload.write_bytes(raw_payload->payload, raw_payload->nn_payload); + srs_freep(pkt); + continue; + } + + srs_freep(pkt); + } + + if ((err = source_->on_video(&rtmp)) != srs_success) { + srs_warn("fail to pack video frame"); + } + + header_sn_ = end + 1; + uint16_t tail_sn = 0; + int sn = find_next_lost_sn(header_sn_, tail_sn); + if (-1 == sn) { + if (check_frame_complete(header_sn_, tail_sn)) { + err = packet_video_rtmp(header_sn_, tail_sn); + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = sn; + } + + return err; +} + +int32_t SrsRtmpFromRtcBridger::find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn) +{ + uint32_t last_ts = cache_video_pkts_[cache_index(header_sn_)].ts; + for (int i = 0; i < s_cache_size; ++i) { + uint16_t lost_sn = current_sn + i; + int index = cache_index(lost_sn); + + if (!cache_video_pkts_[index].in_use) { + return lost_sn; + } + //check time first, avoid two small frame mixed case decode fail + if (last_ts != cache_video_pkts_[index].ts) { + end_sn = lost_sn - 1; + return -1; + } + + if (cache_video_pkts_[index].pkt->header.get_marker()) { + end_sn = lost_sn; + return -1; + } + } + + srs_error("the cache is mess. the packet count of video frame is more than %u", s_cache_size); + return -2; +} + +void SrsRtmpFromRtcBridger::clear_cached_video() +{ + for (size_t i = 0; i < s_cache_size; i++) + { + if (cache_video_pkts_[i].in_use) { + srs_freep(cache_video_pkts_[i].pkt); + cache_video_pkts_[i].sn = 0; + cache_video_pkts_[i].ts = 0; + cache_video_pkts_[i].in_use = false; + } + } +} + +bool SrsRtmpFromRtcBridger::check_frame_complete(const uint16_t start, const uint16_t end) +{ + uint16_t cnt = (end - start + 1); + uint16_t fu_s_c = 0; + uint16_t fu_e_c = 0; + for (uint16_t i = 0; i < cnt; ++i) { + int index = cache_index((start + i)); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + ++fu_s_c; + } + + if (fua_payload->end) { + ++fu_e_c; + } + } + } + + return fu_s_c == fu_e_c; +} #endif -SrsRtcDummyBridger::SrsRtcDummyBridger(SrsRtcStream* s) -{ - rtc_ = s; - impl_ = NULL; -} - -SrsRtcDummyBridger::~SrsRtcDummyBridger() -{ - srs_freep(impl_); -} - -srs_error_t SrsRtcDummyBridger::on_publish() -{ - if (impl_) { - return impl_->on_publish(); - } - return rtc_->on_publish(); -} - -srs_error_t SrsRtcDummyBridger::on_audio(SrsSharedPtrMessage* audio) -{ - if (impl_) { - return impl_->on_audio(audio); - } - return srs_success; -} - -srs_error_t SrsRtcDummyBridger::on_video(SrsSharedPtrMessage* video) -{ - if (impl_) { - return impl_->on_video(video); - } - return srs_success; -} - -void SrsRtcDummyBridger::on_unpublish() -{ - if (impl_) { - impl_->on_unpublish(); - return; - } - rtc_->on_unpublish(); -} - -void SrsRtcDummyBridger::setup(ISrsSourceBridger* impl) -{ - srs_freep(impl_); - impl_ = impl; -} - SrsCodecPayload::SrsCodecPayload() { pt_of_publisher_ = pt_ = 0; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index e29711feb..2543daa6f 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -45,7 +45,7 @@ class SrsCommonMessage; class SrsMessageArray; class SrsRtcStream; class SrsRtcFromRtmpBridger; -class SrsAudioRecode; +class SrsAudioTranscoder; class SrsRtpPacket2; class SrsRtpPacketCacheHelper; class SrsSample; @@ -56,7 +56,6 @@ class SrsRtpRingBuffer; class SrsRtpNackForReceiver; class SrsJsonObject; class SrsErrorPithyPrint; -class SrsRtcDummyBridger; class SrsNtp { @@ -164,8 +163,20 @@ public: virtual void on_consumers_finished() = 0; }; +// SrsRtcStream bridge to SrsSource +class ISrsRtcSourceBridger +{ +public: + ISrsRtcSourceBridger(); + virtual ~ISrsRtcSourceBridger(); +public: + virtual srs_error_t on_publish() = 0; + virtual srs_error_t on_rtp(SrsRtpPacket2 *pkt) = 0; + virtual void on_unpublish() = 0; +}; + // A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. -class SrsRtcStream +class SrsRtcStream : public ISrsFastTimer { private: // For publish, it's the publish client id. @@ -177,10 +188,10 @@ private: SrsContextId _pre_source_id; SrsRequest* req; ISrsRtcPublishStream* publish_stream_; - // Transmux RTMP to RTC. - SrsRtcDummyBridger* bridger_; // Steam description for this steam. SrsRtcStreamDescription* stream_desc_; + // The Source bridger, bridger stream to other source. + ISrsRtcSourceBridger* bridger_; private: // To delivery stream to clients. std::vector consumers; @@ -204,8 +215,8 @@ public: // Get current source id. virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); - // Get the bridger. - ISrsSourceBridger* bridger(); +public: + void set_bridger(ISrsRtcSourceBridger *bridger); public: // Create consumer // @param consumer, output the create consumer. @@ -239,6 +250,9 @@ public: bool has_stream_desc(); void set_stream_desc(SrsRtcStreamDescription* stream_desc); std::vector get_track_desc(std::string type, std::string media_type); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); }; // A helper class, to release the packet to cache. @@ -263,10 +277,9 @@ private: SrsMetaCache* meta; private: bool discard_aac; - SrsAudioRecode* codec; + SrsAudioTranscoder* codec_; bool discard_bframe; bool merge_nalus; - uint32_t audio_timestamp; uint16_t audio_sequence; uint16_t video_sequence; uint32_t audio_ssrc; @@ -280,8 +293,8 @@ public: virtual void on_unpublish(); virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); private: - srs_error_t transcode(char* adts_audio, int nn_adts_audio); - srs_error_t package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper); + srs_error_t transcode(SrsAudioFrame* audio); + srs_error_t package_opus(SrsAudioFrame* audio, SrsRtpPacketCacheHelper* helper); public: virtual srs_error_t on_video(SrsSharedPtrMessage* msg); private: @@ -292,26 +305,53 @@ private: srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& helpers); srs_error_t consume_packets(std::vector& helpers); }; -#endif -class SrsRtcDummyBridger : public ISrsSourceBridger +class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger { private: - SrsRtcStream* rtc_; - // The optional implementation bridger, ignore if NULL. - ISrsSourceBridger* impl_; + SrsSource *source_; + SrsAudioTranscoder *codec_; + bool is_first_audio; + bool is_first_video; + // The format, codec information. + SrsRtmpFormat* format; + + //TODO:use SrsRtpRingBuffer + //TODO:jitter buffer class + struct RtcPacketCache { + bool in_use; + uint16_t sn; + uint32_t ts; + SrsRtpPacket2* pkt; + }; + const static uint16_t s_cache_size = 512; + RtcPacketCache cache_video_pkts_[s_cache_size]; + uint16_t header_sn_; + uint16_t lost_sn_; + int64_t key_frame_ts_; public: - SrsRtcDummyBridger(SrsRtcStream* s); - virtual ~SrsRtcDummyBridger(); + SrsRtmpFromRtcBridger(SrsSource *src); + virtual ~SrsRtmpFromRtcBridger(); +public: + srs_error_t initialize(SrsRequest* r); public: virtual srs_error_t on_publish(); - virtual srs_error_t on_audio(SrsSharedPtrMessage* audio); - virtual srs_error_t on_video(SrsSharedPtrMessage* video); + virtual srs_error_t on_rtp(SrsRtpPacket2 *pkt); virtual void on_unpublish(); -public: - // Setup a new implementation bridger, which might be NULL to free previous one. - void setup(ISrsSourceBridger* impl); +private: + srs_error_t trancode_audio(SrsRtpPacket2 *pkt); + void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header); + srs_error_t packet_video(SrsRtpPacket2* pkt); + srs_error_t packet_video_key_frame(SrsRtpPacket2* pkt); + srs_error_t packet_video_rtmp(const uint16_t start, const uint16_t end); + int32_t find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn); + void clear_cached_video(); + inline uint16_t cache_index(uint16_t current_sn) { + return current_sn%s_cache_size; + } + bool check_frame_complete(const uint16_t start, const uint16_t end); }; +#endif // TODO: FIXME: Rename it. class SrsCodecPayload diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e82ca3fbb..d6fe9d630 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -55,6 +55,7 @@ using namespace std; #include #include #include +#include // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. @@ -959,23 +960,47 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source) srs_error_t err = srs_success; SrsRequest* req = info->req; - + + // Check whether RTC stream is busy. +#ifdef SRS_RTC + SrsRtcStream *rtc = NULL; + bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); + bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost); + if (rtc_server_enabled && rtc_enabled && !info->edge) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (!rtc->can_publish()) { + return srs_error_new(ERROR_RTC_SOURCE_BUSY, "rtc stream %s busy", req->get_stream_url().c_str()); + } + } +#endif + + // Check whether RTMP stream is busy. if (!source->can_publish(info->edge)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); } - - // when edge, ignore the publish event, directly proxy it. - if (info->edge) { - if ((err = source->on_edge_start_publish()) != srs_success) { - return srs_error_wrap(err, "rtmp: edge start publish"); - } - } else { - if ((err = source->on_publish()) != srs_success) { - return srs_error_wrap(err, "rtmp: source publish"); + + // Bridge to RTC streaming. +#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) + if (rtc) { + SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc); + if ((err = bridger->initialize(req)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "bridger init"); } + + source->set_bridger(bridger); + } +#endif + + // Start publisher now. + if (info->edge) { + return source->on_edge_start_publish(); + } else { + return source->on_publish(); } - - return err; } void SrsRtmpConn::release_publish(SrsSource* source) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 1a9553d41..ce09e6614 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1728,19 +1728,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); -#ifdef SRS_RTC - bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); - bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost); - - // Get the RTC source and bridger. - SrsRtcStream* rtc = NULL; - if (rtc_server_enabled && rtc_enabled) { - if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) { - err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str()); - goto failed; - } - } -#endif srs_trace("new source, stream_url=%s", stream_url.c_str()); source = new SrsSource(); @@ -1748,14 +1735,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); goto failed; } - -#ifdef SRS_RTC - // If rtc enabled, bridge RTMP source to RTC, - // all RTMP packets will be forwarded to RTC source. - if (source && rtc) { - source->bridge_to(rtc->bridger()); - } -#endif pool[stream_url] = source; *pps = source; @@ -1883,7 +1862,7 @@ SrsSource::SrsSource() die_at = 0; handler = NULL; - bridger = NULL; + bridger_ = NULL; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -1915,6 +1894,7 @@ SrsSource::~SrsSource() srs_freep(gop_cache); srs_freep(req); + srs_freep(bridger_); } void SrsSource::dispose() @@ -1990,9 +1970,10 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) return err; } -void SrsSource::bridge_to(ISrsSourceBridger* v) +void SrsSource::set_bridger(ISrsSourceBridger* v) { - bridger = v; + srs_freep(bridger_); + bridger_ = v; } srs_error_t SrsSource::on_reload_vhost_play(string vhost) @@ -2245,7 +2226,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) } // For bridger to consume the message. - if (bridger && (err = bridger->on_audio(msg)) != srs_success) { + if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume audio"); } @@ -2375,7 +2356,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) } // For bridger to consume the message. - if (bridger && (err = bridger->on_video(msg)) != srs_success) { + if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume video"); } @@ -2539,7 +2520,7 @@ srs_error_t SrsSource::on_publish() return srs_error_wrap(err, "handle publish"); } - if (bridger && (err = bridger->on_publish()) != srs_success) { + if (bridger_ && (err = bridger_->on_publish()) != srs_success) { return srs_error_wrap(err, "bridger publish"); } @@ -2584,8 +2565,9 @@ void SrsSource::on_unpublish() handler->on_unpublish(this, req); - if (bridger) { - bridger->on_unpublish(); + if (bridger_) { + bridger_->on_unpublish(); + srs_freep(bridger_); } // no consumer, stream is die. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 8a35b5155..4a5905895 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -487,7 +487,7 @@ public: // Global singleton instance. extern SrsSourceManager* _srs_sources; -// For two sources to bridge with each other. +// For RTMP2RTC, bridge SrsSource to SrsRtcStream class ISrsSourceBridger { public: @@ -500,7 +500,8 @@ public: virtual void on_unpublish() = 0; }; -// live streaming source. +// The live streaming source. +// TODO: FIXME: Rename to SrsLiveStream. class SrsSource : public ISrsReloadHandler { friend class SrsOriginHub; @@ -534,7 +535,7 @@ private: // The event handler. ISrsSourceHandler* handler; // The source bridger for other source. - ISrsSourceBridger* bridger; + ISrsSourceBridger* bridger_; // The edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; @@ -562,7 +563,7 @@ public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); // Bridge to other source, forward packets to it. - void bridge_to(ISrsSourceBridger* v); + void set_bridger(ISrsSourceBridger* v); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 441c0fa7d..64f88dea1 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 91 +#define VERSION_REVISION 95 #endif diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 56b0bf1d5..a4c0b3284 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -650,9 +650,8 @@ public: virtual bool is_avc_codec_ok(); }; -/** - * A frame, consists of a codec and a group of samples. - */ +// A frame, consists of a codec and a group of samples. +// TODO: FIXME: Rename to packet to follow names of FFmpeg, which means before decoding or after decoding. class SrsFrame { public: @@ -677,9 +676,8 @@ public: virtual srs_error_t add_sample(char* bytes, int size); }; -/** - * A audio frame, besides a frame, contains the audio frame info, such as frame type. - */ +// A audio frame, besides a frame, contains the audio frame info, such as frame type. +// TODO: FIXME: Rename to packet to follow names of FFmpeg, which means before decoding or after decoding. class SrsAudioFrame : public SrsFrame { public: @@ -691,9 +689,8 @@ public: virtual SrsAudioCodecConfig* acodec(); }; -/** - * A video frame, besides a frame, contains the video frame info, such as frame type. - */ +// A video frame, besides a frame, contains the video frame info, such as frame type. +// TODO: FIXME: Rename to packet to follow names of FFmpeg, which means before decoding or after decoding. class SrsVideoFrame : public SrsFrame { public: diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index bcc04d766..5474d9106 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -1054,6 +1054,33 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) return err; } +bool SrsRtpPacket2::is_keyframe() +{ + // False if audio packet + if(SrsFrameTypeAudio == frame_type) { + return false; + } + + // It's normal H264 video rtp packet + if (nalu_type == kStapA) { + SrsRtpSTAPPayload* stap_payload = dynamic_cast(payload_); + if(NULL != stap_payload->get_sps() || NULL != stap_payload->get_pps()) { + return true; + } + } else if (nalu_type == kFuA) { + SrsRtpFUAPayload2* fua_payload = dynamic_cast(payload_); + if(SrsAvcNaluTypeIDR == fua_payload->nalu_type) { + return true; + } + } else { + if((SrsAvcNaluTypeIDR == nalu_type) || (SrsAvcNaluTypeSPS == nalu_type) || (SrsAvcNaluTypePPS == nalu_type)) { + return true; + } + } + + return false; +} + SrsRtpObjectCacheManager* _srs_rtp_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpPacket2)); SrsRtpObjectCacheManager* _srs_rtp_raw_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpRawPayload)); SrsRtpObjectCacheManager* _srs_rtp_fua_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpFUAPayload2)); diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index c0da94f21..b00135e85 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -353,6 +353,8 @@ public: virtual uint64_t nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); +public: + bool is_keyframe(); }; // For object cache manager to stat the object dropped.