1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00

Support disable NACK in config

This commit is contained in:
winlin 2020-05-04 20:42:30 +08:00
parent d125116317
commit 8c4b6d3166
9 changed files with 212 additions and 69 deletions

View file

@ -509,6 +509,12 @@ vhost rtc.vhost.srs.com {
# default: 1 (For WebRTC, min_latency off) # default: 1 (For WebRTC, min_latency off)
mw_msgs 0; mw_msgs 0;
} }
# For NACK.
nack {
# Whether support NACK.
# default: on
enabled on;
}
} }
############################################################################################# #############################################################################################

View file

@ -3782,7 +3782,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "play" && n != "publish" && n != "cluster" && n != "play" && n != "publish" && n != "cluster"
&& n != "security" && n != "http_remux" && n != "dash" && n != "security" && n != "http_remux" && n != "dash"
&& n != "http_static" && n != "hds" && n != "exec" && n != "http_static" && n != "hds" && n != "exec"
&& n != "in_ack_size" && n != "out_ack_size" && n != "rtc") { && n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "nack") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str()); return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str());
} }
// for each sub directives of vhost. // for each sub directives of vhost.
@ -5083,6 +5083,29 @@ bool SrsConfig::get_rtc_stun_strict_check(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0()); return SRS_CONF_PERFER_FALSE(conf->arg0());
} }
bool SrsConfig::get_rtc_nack_enabled(string vhost)
{
static bool DEFAULT = true;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("nack");
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
{ {
srs_assert(root); srs_assert(root);

View file

@ -547,6 +547,7 @@ public:
bool get_rtc_aac_discard(std::string vhost); bool get_rtc_aac_discard(std::string vhost);
srs_utime_t get_rtc_stun_timeout(std::string vhost); srs_utime_t get_rtc_stun_timeout(std::string vhost);
bool get_rtc_stun_strict_check(std::string vhost); bool get_rtc_stun_strict_check(std::string vhost);
bool get_rtc_nack_enabled(std::string vhost);
// vhost specified section // vhost specified section
public: public:

View file

@ -914,11 +914,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
return srs_error_wrap(err, "remote sdp check failed"); return srs_error_wrap(err, "remote sdp check failed");
} }
SrsSdp local_sdp;
if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
SrsRequest request; SrsRequest request;
request.app = app; request.app = app;
request.stream = stream_name; request.stream = stream_name;
@ -930,6 +925,11 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
request.vhost = parsed_vhost->arg0(); request.vhost = parsed_vhost->arg0();
} }
SrsSdp local_sdp;
if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
// Whether enabled. // Whether enabled.
bool server_enabled = _srs_config->get_rtc_server_enabled(); bool server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost); bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost);
@ -1006,7 +1006,7 @@ srs_error_t SrsGoApiRtcPlay::check_remote_sdp(const SrsSdp& remote_sdp)
return err; return err;
} }
srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp) srs_error_t SrsGoApiRtcPlay::exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
local_sdp.version_ = "0"; local_sdp.version_ = "0";
@ -1021,10 +1021,12 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
local_sdp.session_name_ = "SRSPlaySession"; local_sdp.session_name_ = "SRSPlaySession";
local_sdp.msid_semantic_ = "WMS"; local_sdp.msid_semantic_ = "WMS";
local_sdp.msids_.push_back(app + "/" + stream); local_sdp.msids_.push_back(req->app + "/" + req->stream);
local_sdp.group_policy_ = "BUNDLE"; local_sdp.group_policy_ = "BUNDLE";
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost);
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i];
@ -1047,8 +1049,10 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
vector<string> rtcp_fb; vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb); payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) { for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { if (nack_enabled) {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
} }
} }
@ -1081,8 +1085,10 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
vector<string> rtcp_fb; vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb); payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) { for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { if (nack_enabled) {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
} }
} }
@ -1261,11 +1267,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
return srs_error_wrap(err, "remote sdp check failed"); return srs_error_wrap(err, "remote sdp check failed");
} }
SrsSdp local_sdp;
if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
SrsRequest request; SrsRequest request;
request.app = app; request.app = app;
request.stream = stream_name; request.stream = stream_name;
@ -1277,6 +1278,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
request.vhost = parsed_vhost->arg0(); request.vhost = parsed_vhost->arg0();
} }
SrsSdp local_sdp;
if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
// Whether enabled. // Whether enabled.
bool server_enabled = _srs_config->get_rtc_server_enabled(); bool server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost); bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost);
@ -1348,7 +1354,7 @@ srs_error_t SrsGoApiRtcPublish::check_remote_sdp(const SrsSdp& remote_sdp)
return err; return err;
} }
srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp) srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
local_sdp.version_ = "0"; local_sdp.version_ = "0";
@ -1363,10 +1369,12 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
local_sdp.session_name_ = "SRSPublishSession"; local_sdp.session_name_ = "SRSPublishSession";
local_sdp.msid_semantic_ = "WMS"; local_sdp.msid_semantic_ = "WMS";
local_sdp.msids_.push_back(app + "/" + stream); local_sdp.msids_.push_back(req->app + "/" + req->stream);
local_sdp.group_policy_ = "BUNDLE"; local_sdp.group_policy_ = "BUNDLE";
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost);
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i];
@ -1389,8 +1397,10 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
vector<string> rtcp_fb; vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb); payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) { for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { if (nack_enabled) {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
} }
} }
@ -1424,8 +1434,10 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
vector<string> rtcp_fb; vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb); payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) { for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { if (nack_enabled) {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
} }
} }

View file

@ -34,6 +34,7 @@ class SrsServer;
class SrsRtcServer; class SrsRtcServer;
class SrsJsonObject; class SrsJsonObject;
class SrsSdp; class SrsSdp;
class SrsRequest;
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
#include <srs_app_conn.hpp> #include <srs_app_conn.hpp>
@ -181,7 +182,7 @@ public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private: private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res); virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp); srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
}; };
@ -198,7 +199,7 @@ public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private: private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res); virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp); srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
}; };

View file

@ -620,6 +620,7 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
video_queue_ = new SrsRtpRingBuffer(1000); video_queue_ = new SrsRtpRingBuffer(1000);
nn_simulate_nack_drop = 0; nn_simulate_nack_drop = 0;
nack_enabled_ = false;
_srs_config->subscribe(this); _srs_config->subscribe(this);
} }
@ -646,8 +647,10 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr
gso = _srs_config->get_rtc_server_gso(); gso = _srs_config->get_rtc_server_gso();
merge_nalus = _srs_config->get_rtc_server_merge_nalus(); merge_nalus = _srs_config->get_rtc_server_merge_nalus();
max_padding = _srs_config->get_rtc_server_padding(); max_padding = _srs_config->get_rtc_server_padding();
srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d", // TODO: FIXME: Support reload.
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding); nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d, nack=%d",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding, nack_enabled_);
return err; return err;
} }
@ -1007,7 +1010,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
} }
// Put final RTP packet to NACK/ARQ queue. // Put final RTP packet to NACK/ARQ queue.
if (true) { if (nack_enabled_) {
SrsRtpPacket2* nack = new SrsRtpPacket2(); SrsRtpPacket2* nack = new SrsRtpPacket2();
nack->rtp_header = packet->rtp_header; nack->rtp_header = packet->rtp_header;
nack->padding = packet->padding; nack->padding = packet->padding;
@ -1040,11 +1043,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
// For NACK simulator, drop packet. // For NACK simulator, drop packet.
if (nn_simulate_nack_drop) { if (nn_simulate_nack_drop) {
SrsRtpHeader* h = &packet->rtp_header; simulate_drop_packet(&packet->rtp_header, (int)iov->iov_len);
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
(int)iov->iov_len);
nn_simulate_nack_drop--;
iov->iov_len = 0; iov->iov_len = 0;
continue; continue;
} }
@ -1184,7 +1183,7 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets)
} }
// Put final RTP packet to NACK/ARQ queue. // Put final RTP packet to NACK/ARQ queue.
if (true) { if (nack_enabled_) {
SrsRtpPacket2* nack = new SrsRtpPacket2(); SrsRtpPacket2* nack = new SrsRtpPacket2();
nack->rtp_header = packet->rtp_header; nack->rtp_header = packet->rtp_header;
nack->padding = packet->padding; nack->padding = packet->padding;
@ -1542,6 +1541,15 @@ void SrsRtcPlayer::simulate_nack_drop(int nn)
nn_simulate_nack_drop = nn; nn_simulate_nack_drop = nn;
} }
void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
nn_bytes);
nn_simulate_nack_drop--;
}
SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
{ {
report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
@ -1554,6 +1562,7 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
source = NULL; source = NULL;
nn_simulate_nack_drop = 0; nn_simulate_nack_drop = 0;
nack_enabled_ = false;
} }
SrsRtcPublisher::~SrsRtcPublisher() SrsRtcPublisher::~SrsRtcPublisher()
@ -1580,7 +1589,11 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque
audio_ssrc = assrc; audio_ssrc = assrc;
req = r; req = r;
srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc); // TODO: FIXME: Support reload.
nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
srs_trace("RTC player video(ssrc=%u), audio(ssrc=%u), nack=%d",
video_ssrc, audio_ssrc, nack_enabled_);
if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) { if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "hourglass tick"); return srs_error_wrap(err, "hourglass tick");
@ -1788,6 +1801,12 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr
stream.write_2bytes(pid); stream.write_2bytes(pid);
stream.write_2bytes(blp); stream.write_2bytes(blp);
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
char protected_buf[kRtpPacketSize]; char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos(); int nb_protected_buf = stream.pos();
@ -1970,11 +1989,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf)
// For NACK simulator, drop packet. // For NACK simulator, drop packet.
if (nn_simulate_nack_drop) { if (nn_simulate_nack_drop) {
SrsRtpHeader* h = &pkt->rtp_header; simulate_drop_packet(&pkt->rtp_header, nb_buf);
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
(int)nb_buf);
nn_simulate_nack_drop--;
srs_freep(pkt); srs_freep(pkt);
return err; return err;
} }
@ -2016,14 +2031,23 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
// TODO: FIXME: Error check.
audio_queue_->consume(audio_nack_, pkt);
check_send_nacks(audio_nack_, audio_ssrc);
// Collect all audio frames.
std::vector<SrsRtpPacket2*> frames; std::vector<SrsRtpPacket2*> frames;
audio_queue_->collect_frames(audio_nack_, frames);
if (nack_enabled_) {
// TODO: FIXME: Error check.
audio_queue_->consume(audio_nack_, pkt);
check_send_nacks(audio_nack_, audio_ssrc);
// Collect all audio frames.
audio_queue_->collect_frames(audio_nack_, frames);
} else {
// TODO: FIXME: Error check.
audio_queue_->consume(NULL, pkt);
// Collect all audio frames.
audio_queue_->collect_frames(NULL, frames);
}
for (size_t i = 0; i < frames.size(); ++i) { for (size_t i = 0; i < frames.size(); ++i) {
SrsRtpPacket2* frame = frames[i]; SrsRtpPacket2* frame = frames[i];
@ -2075,19 +2099,23 @@ srs_error_t SrsRtcPublisher::on_audio_frame(SrsRtpPacket2* frame)
srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
{ {
// TODO: FIXME: Error check.
video_queue_->consume(video_nack_, pkt);
if (video_queue_->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(video_ssrc);
}
check_send_nacks(video_nack_, video_ssrc);
// Collect video frames.
std::vector<SrsRtpPacket2*> frames; std::vector<SrsRtpPacket2*> frames;
video_queue_->collect_frames(video_nack_, frames);
if (nack_enabled_) {
// TODO: FIXME: Error check.
video_queue_->consume(video_nack_, pkt);
check_send_nacks(video_nack_, video_ssrc);
// Collect video frames.
video_queue_->collect_frames(video_nack_, frames);
} else {
// TODO: FIXME: Error check.
video_queue_->consume(NULL, pkt);
// Collect video frames.
video_queue_->collect_frames(NULL, frames);
}
for (size_t i = 0; i < frames.size(); ++i) { for (size_t i = 0; i < frames.size(); ++i) {
SrsRtpPacket2* frame = frames[i]; SrsRtpPacket2* frame = frames[i];
@ -2098,6 +2126,11 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
srs_freep(frame); srs_freep(frame);
} }
if (video_queue_->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(video_ssrc);
}
return srs_success; return srs_success;
} }
@ -2232,6 +2265,15 @@ void SrsRtcPublisher::simulate_nack_drop(int nn)
nn_simulate_nack_drop = nn; nn_simulate_nack_drop = nn;
} }
void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
nn_bytes);
nn_simulate_nack_drop--;
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* s) SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
{ {
req = NULL; req = NULL;

View file

@ -227,6 +227,8 @@ private:
srs_utime_t mw_sleep; srs_utime_t mw_sleep;
int mw_msgs; int mw_msgs;
bool realtime; bool realtime;
// Whether enabled nack.
bool nack_enabled_;
public: public:
SrsRtcPlayer(SrsRtcSession* s, int parent_cid); SrsRtcPlayer(SrsRtcSession* s, int parent_cid);
virtual ~SrsRtcPlayer(); virtual ~SrsRtcPlayer();
@ -260,6 +262,8 @@ private:
public: public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq); void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
void simulate_nack_drop(int nn); void simulate_nack_drop(int nn);
private:
void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes);
}; };
class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler
@ -278,6 +282,8 @@ private:
private: private:
SrsRequest* req; SrsRequest* req;
SrsSource* source; SrsSource* source;
// Whether enabled nack.
bool nack_enabled_;
// Simulators. // Simulators.
int nn_simulate_nack_drop; int nn_simulate_nack_drop;
private: private:
@ -310,6 +316,8 @@ public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
public: public:
void simulate_nack_drop(int nn); void simulate_nack_drop(int nn);
private:
void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes);
}; };
class SrsRtcSession class SrsRtcSession

View file

@ -58,8 +58,7 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver()
void SrsRtpNackForReceiver::insert(uint16_t seq) void SrsRtpNackForReceiver::insert(uint16_t seq)
{ {
// FIXME: full, drop packet, and request key frame. // FIXME: full, drop packet, and request key frame.
SrsRtpNackInfo& nack_info = queue_[seq]; queue_[seq] = SrsRtpNackInfo();
(void)nack_info;
} }
void SrsRtpNackForReceiver::remove(uint16_t seq) void SrsRtpNackForReceiver::remove(uint16_t seq)
@ -88,12 +87,12 @@ void SrsRtpNackForReceiver::check_queue_size()
void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs) void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs)
{ {
srs_utime_t now = srs_update_system_time(); srs_utime_t now = srs_update_system_time();
int interval = now - pre_check_time_; srs_utime_t interval = now - pre_check_time_;
if (interval < opts_.nack_interval / 2) { if (interval < opts_.nack_interval / 2) {
return; return;
} }
pre_check_time_ = now; pre_check_time_ = now;
std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.begin(); std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.begin();
while (iter != queue_.end()) { while (iter != queue_.end()) {
const uint16_t& seq = iter->first; const uint16_t& seq = iter->first;
@ -205,8 +204,8 @@ void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack
// Normal sequence, seq follows high_. // Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) { if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end + 1; nack_first = end;
nack_last = seq + 1; nack_last = seq;
// When distance(seq,high_)>0 and seq<high_, seq must flip back, // When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535. // for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
@ -260,7 +259,14 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
srs_utime_t now = srs_update_system_time(); srs_utime_t now = srs_update_system_time();
uint16_t seq = pkt->rtp_header.get_sequence(); uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = nack->find(seq);
SrsRtpNackInfo* nack_info = NULL;
// If no NACK, disable nack.
if (nack) {
nack_info = nack->find(seq);
}
if (nack_info) { if (nack_info) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
(void)nack_rtt; (void)nack_rtt;
@ -289,7 +295,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
++num_of_packet_received_; ++num_of_packet_received_;
uint16_t nack_first = 0, nack_last = 0; uint16_t nack_first = 0, nack_last = 0;
queue_->update(seq, nack_first, nack_last); queue_->update(seq, nack_first, nack_last);
if (srs_rtp_seq_distance(nack_first, nack_last) > 0) { if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last); insert_into_nack_list(nack, nack_first, nack_last);
} }
@ -332,6 +338,10 @@ uint32_t SrsRtpQueue::get_interarrival_jitter()
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last) void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last)
{ {
if (!nack) {
return;
}
for (uint16_t s = first; s != last; ++s) { for (uint16_t s = first; s != last; ++s) {
nack->insert(s); nack->insert(s);
++number_of_packet_lossed_; ++number_of_packet_lossed_;
@ -365,6 +375,11 @@ void SrsRtpAudioQueue::notify_nack_list_full()
queue_->advance_to(queue_->end); queue_->advance_to(queue_->end);
} }
srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
return SrsRtpQueue::consume(nack, pkt);
}
void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames) void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames)
{ {
// When done, next point to the next available packet. // When done, next point to the next available packet.
@ -372,6 +387,16 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtp
for (; next != queue_->end; ++next) { for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next); SrsRtpPacket2* pkt = queue_->at(next);
// If nack disabled, we ignore any empty packet.
if (!nack) {
if (!pkt) {
continue;
}
frames.push_back(pkt);
continue;
}
// TODO: FIXME: Should not wait for NACK packets. // TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame. // Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) { if (!pkt || nack->find(next) != NULL) {
@ -391,6 +416,7 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtp
} }
// For audio, if overflow, clear all packets. // For audio, if overflow, clear all packets.
// TODO: FIXME: Should notify nack?
if (queue_->overflow()) { if (queue_->overflow()) {
queue_->advance_to(queue_->end); queue_->advance_to(queue_->end);
} }
@ -497,7 +523,9 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end); srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end);
for (uint16_t s = queue_->begin; s != next; ++s) { for (uint16_t s = queue_->begin; s != next; ++s) {
nack->remove(s); if (nack) {
nack->remove(s);
}
queue_->remove(s); queue_->remove(s);
} }
@ -515,6 +543,27 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2*
for (; next != queue_->end; ++next) { for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next); SrsRtpPacket2* pkt = queue_->at(next);
// TODO: FIXME: We should skip whole packet.
// If nack disabled, we ignore any empty packet.
if (!nack) {
if (!pkt) {
continue;
}
if (frame.empty() && !pkt->video_is_first_packet) {
continue;
}
frame.push_back(pkt);
if (pkt->rtp_header.get_marker() || pkt->video_is_last_packet) {
found = true;
next++;
break;
}
continue;
}
// TODO: FIXME: Should not wait for NACK packets. // TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame. // Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) { if (!pkt || nack->find(next) != NULL) {
@ -523,7 +572,7 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2*
} }
// Ignore when the first packet not the start. // Ignore when the first packet not the start.
if (next == queue_->begin && !pkt->video_is_first_packet) { if (frame.empty() && !pkt->video_is_first_packet) {
return; return;
} }

View file

@ -192,6 +192,7 @@ public:
public: public:
virtual void notify_drop_seq(uint16_t seq); virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full(); virtual void notify_nack_list_full();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames); virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames);
}; };