1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

Merge branch 'feature/rtc' into develop

This commit is contained in:
winlin 2020-05-05 10:51:51 +08:00
commit 32fd020de8
18 changed files with 571 additions and 353 deletions

View file

@ -509,6 +509,12 @@ vhost rtc.vhost.srs.com {
# default: 1 (For WebRTC, min_latency off)
mw_msgs 0;
}
# For NACK.
nack {
# Whether support NACK.
# default: on
enabled on;
}
}
#############################################################################################

View file

@ -48,6 +48,9 @@
<label></label>
<video id="rtc_media_player" controls autoplay></video>
<label></label>
SessionID: <span id='sessionid'></span>
<footer>
<p></p>
<p><a href="https://github.com/ossrs/srs">SRS Team &copy; 2020</a></p>
@ -115,6 +118,7 @@
if (data.code) {
reject(data); return;
}
$('#sessionid').html(data.sessionid);
resolve(data.sdp);
}).fail(function(reason){
reject(reason);

View file

@ -48,6 +48,9 @@
<label></label>
<video id="rtc_media_player" width="320" autoplay muted></video>
<label></label>
SessionID: <span id='sessionid'></span>
<footer>
<p></p>
<p><a href="https://github.com/ossrs/srs">SRS Team &copy; 2020</a></p>
@ -125,6 +128,7 @@
if (data.code) {
reject(data); return;
}
$('#sessionid').html(data.sessionid);
resolve(data.sdp);
}).fail(function(reason){
reject(reason);

View file

@ -3782,7 +3782,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "play" && n != "publish" && n != "cluster"
&& n != "security" && n != "http_remux" && n != "dash"
&& n != "http_static" && n != "hds" && n != "exec"
&& n != "in_ack_size" && n != "out_ack_size" && n != "rtc") {
&& n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "nack") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str());
}
// for each sub directives of vhost.
@ -5083,6 +5083,29 @@ bool SrsConfig::get_rtc_stun_strict_check(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_rtc_nack_enabled(string vhost)
{
static bool DEFAULT = true;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("nack");
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
{
srs_assert(root);

View file

@ -547,6 +547,7 @@ public:
bool get_rtc_aac_discard(std::string vhost);
srs_utime_t get_rtc_stun_timeout(std::string vhost);
bool get_rtc_stun_strict_check(std::string vhost);
bool get_rtc_nack_enabled(std::string vhost);
// vhost specified section
public:

View file

@ -914,11 +914,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
return srs_error_wrap(err, "remote sdp check failed");
}
SrsSdp local_sdp;
if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
SrsRequest request;
request.app = app;
request.stream = stream_name;
@ -930,6 +925,11 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
request.vhost = parsed_vhost->arg0();
}
SrsSdp local_sdp;
if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
// Whether enabled.
bool server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost);
@ -967,7 +967,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
// TODO: add candidates in response json?
res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str()));
res->set("sessionid", SrsJsonAny::str(session->id().c_str()));
res->set("sessionid", SrsJsonAny::str(session->username().c_str()));
srs_trace("RTC username=%s, offer=%dB, answer=%dB", session->username().c_str(),
remote_sdp_str.length(), local_sdp_str.length());
@ -1006,7 +1006,7 @@ srs_error_t SrsGoApiRtcPlay::check_remote_sdp(const SrsSdp& remote_sdp)
return err;
}
srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
srs_error_t SrsGoApiRtcPlay::exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{
srs_error_t err = srs_success;
local_sdp.version_ = "0";
@ -1021,10 +1021,12 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
local_sdp.session_name_ = "SRSPlaySession";
local_sdp.msid_semantic_ = "WMS";
local_sdp.msids_.push_back(app + "/" + stream);
local_sdp.msids_.push_back(req->app + "/" + req->stream);
local_sdp.group_policy_ = "BUNDLE";
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost);
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i];
@ -1047,8 +1049,10 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
if (nack_enabled) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
}
}
@ -1081,8 +1085,10 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
if (nack_enabled) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
}
}
@ -1261,11 +1267,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
return srs_error_wrap(err, "remote sdp check failed");
}
SrsSdp local_sdp;
if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
SrsRequest request;
request.app = app;
request.stream = stream_name;
@ -1277,6 +1278,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
request.vhost = parsed_vhost->arg0();
}
SrsSdp local_sdp;
if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
// Whether enabled.
bool server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost);
@ -1309,7 +1315,7 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
// TODO: add candidates in response json?
res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str()));
res->set("sessionid", SrsJsonAny::str(session->id().c_str()));
res->set("sessionid", SrsJsonAny::str(session->username().c_str()));
srs_trace("RTC username=%s, offer=%dB, answer=%dB", session->username().c_str(),
remote_sdp_str.length(), local_sdp_str.length());
@ -1348,7 +1354,7 @@ srs_error_t SrsGoApiRtcPublish::check_remote_sdp(const SrsSdp& remote_sdp)
return err;
}
srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{
srs_error_t err = srs_success;
local_sdp.version_ = "0";
@ -1363,10 +1369,12 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
local_sdp.session_name_ = "SRSPublishSession";
local_sdp.msid_semantic_ = "WMS";
local_sdp.msids_.push_back(app + "/" + stream);
local_sdp.msids_.push_back(req->app + "/" + req->stream);
local_sdp.group_policy_ = "BUNDLE";
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost);
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i];
@ -1389,8 +1397,10 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
if (nack_enabled) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
}
}
@ -1424,8 +1434,10 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
vector<string> rtcp_fb;
payload_type.rtcp_fb_.swap(rtcp_fb);
for (int j = 0; j < (int)rtcp_fb.size(); j++) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
if (nack_enabled) {
if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
}
}

View file

@ -34,6 +34,7 @@ class SrsServer;
class SrsRtcServer;
class SrsJsonObject;
class SrsSdp;
class SrsRequest;
#include <srs_app_st.hpp>
#include <srs_app_conn.hpp>
@ -181,7 +182,7 @@ public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
};
@ -198,7 +199,7 @@ public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
};

View file

@ -616,10 +616,11 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
realtime = true;
// TODO: FIXME: Config the capacity?
audio_queue_ = new SrsRtpRingBuffer(100);
video_queue_ = new SrsRtpRingBuffer(1000);
audio_queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(100);
video_queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(1000);
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
_srs_config->subscribe(this);
}
@ -646,8 +647,10 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr
gso = _srs_config->get_rtc_server_gso();
merge_nalus = _srs_config->get_rtc_server_merge_nalus();
max_padding = _srs_config->get_rtc_server_padding();
srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding);
// TODO: FIXME: Support reload.
nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d, nack=%d",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding, nack_enabled_);
return err;
}
@ -1007,7 +1010,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
}
// Put final RTP packet to NACK/ARQ queue.
if (true) {
if (nack_enabled_) {
SrsRtpPacket2* nack = new SrsRtpPacket2();
nack->rtp_header = packet->rtp_header;
nack->padding = packet->padding;
@ -1040,11 +1043,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
// For NACK simulator, drop packet.
if (nn_simulate_nack_drop) {
SrsRtpHeader* h = &packet->rtp_header;
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
(int)iov->iov_len);
nn_simulate_nack_drop--;
simulate_drop_packet(&packet->rtp_header, (int)iov->iov_len);
iov->iov_len = 0;
continue;
}
@ -1184,7 +1183,7 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets)
}
// Put final RTP packet to NACK/ARQ queue.
if (true) {
if (nack_enabled_) {
SrsRtpPacket2* nack = new SrsRtpPacket2();
nack->rtp_header = packet->rtp_header;
nack->padding = packet->padding;
@ -1542,6 +1541,15 @@ void SrsRtcPlayer::simulate_nack_drop(int nn)
nn_simulate_nack_drop = nn;
}
void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
nn_bytes);
nn_simulate_nack_drop--;
}
SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
{
report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
@ -1553,6 +1561,8 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
audio_nack_ = new SrsRtpNackForReceiver(audio_queue_, 100 * 2 / 3);
source = NULL;
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
}
SrsRtcPublisher::~SrsRtcPublisher()
@ -1579,7 +1589,11 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque
audio_ssrc = assrc;
req = r;
srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc);
// TODO: FIXME: Support reload.
nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
srs_trace("RTC player video(ssrc=%u), audio(ssrc=%u), nack=%d",
video_ssrc, audio_ssrc, nack_enabled_);
if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "hourglass tick");
@ -1787,6 +1801,12 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr
stream.write_2bytes(pid);
stream.write_2bytes(blp);
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
@ -1967,6 +1987,13 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf)
return srs_error_wrap(err, "decode rtp packet");
}
// For NACK simulator, drop packet.
if (nn_simulate_nack_drop) {
simulate_drop_packet(&pkt->rtp_header, nb_buf);
srs_freep(pkt);
return err;
}
uint32_t ssrc = pkt->rtp_header.get_ssrc();
if (ssrc == audio_ssrc) {
return on_audio(pkt);
@ -2004,14 +2031,23 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Error check.
audio_queue_->consume(audio_nack_, pkt);
check_send_nacks(audio_nack_, audio_ssrc);
// Collect all audio frames.
std::vector<SrsRtpPacket2*> frames;
audio_queue_->collect_frames(audio_nack_, frames);
if (nack_enabled_) {
// TODO: FIXME: Error check.
audio_queue_->consume(audio_nack_, pkt);
check_send_nacks(audio_nack_, audio_ssrc);
// Collect all audio frames.
audio_queue_->collect_frames(audio_nack_, frames);
} else {
// TODO: FIXME: Error check.
audio_queue_->consume(NULL, pkt);
// Collect all audio frames.
audio_queue_->collect_frames(NULL, frames);
}
for (size_t i = 0; i < frames.size(); ++i) {
SrsRtpPacket2* frame = frames[i];
@ -2063,19 +2099,23 @@ srs_error_t SrsRtcPublisher::on_audio_frame(SrsRtpPacket2* frame)
srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
{
// TODO: FIXME: Error check.
video_queue_->consume(video_nack_, pkt);
if (video_queue_->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(video_ssrc);
}
check_send_nacks(video_nack_, video_ssrc);
// Collect video frames.
std::vector<SrsRtpPacket2*> frames;
video_queue_->collect_frames(video_nack_, frames);
if (nack_enabled_) {
// TODO: FIXME: Error check.
video_queue_->consume(video_nack_, pkt);
check_send_nacks(video_nack_, video_ssrc);
// Collect video frames.
video_queue_->collect_frames(video_nack_, frames);
} else {
// TODO: FIXME: Error check.
video_queue_->consume(NULL, pkt);
// Collect video frames.
video_queue_->collect_frames(NULL, frames);
}
for (size_t i = 0; i < frames.size(); ++i) {
SrsRtpPacket2* frame = frames[i];
@ -2086,6 +2126,11 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
srs_freep(frame);
}
if (video_queue_->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(video_ssrc);
}
return srs_success;
}
@ -2217,7 +2262,16 @@ srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t
void SrsRtcPublisher::simulate_nack_drop(int nn)
{
// TODO: FIXME: Implements it.
nn_simulate_nack_drop = nn;
}
void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(),
nn_bytes);
nn_simulate_nack_drop--;
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
@ -3359,16 +3413,17 @@ srs_error_t SrsRtcServer::create_session(
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str());
}
// TODO: FIXME: Seems not random, please check it.
std::string local_pwd = gen_random_str(32);
std::string local_ufrag = "";
// TODO: FIXME: Rename for a better name, it's not an username.
std::string username = "";
while (true) {
local_ufrag = gen_random_str(8);
username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
if (!map_username_session.count(username))
if (!map_username_session.count(username)) {
break;
}
}
int cid = _srs_context->get_id();

View file

@ -34,6 +34,7 @@
#include <srs_app_sdp.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_rtp.hpp>
#include <srs_app_rtp_queue.hpp>
#include <string>
#include <map>
@ -59,7 +60,6 @@ class SrsRtpPacket2;
class ISrsCodec;
class SrsRtpNackForReceiver;
class SrsRtpIncommingVideoFrame;
class SrsRtpRingBuffer;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -214,8 +214,8 @@ private:
uint16_t video_payload_type;
uint32_t video_ssrc;
// NACK ARQ ring buffer.
SrsRtpRingBuffer* audio_queue_;
SrsRtpRingBuffer* video_queue_;
SrsRtpRingBuffer<SrsRtpPacket2*>* audio_queue_;
SrsRtpRingBuffer<SrsRtpPacket2*>* video_queue_;
// Simulators.
int nn_simulate_nack_drop;
private:
@ -227,6 +227,8 @@ private:
srs_utime_t mw_sleep;
int mw_msgs;
bool realtime;
// Whether enabled nack.
bool nack_enabled_;
public:
SrsRtcPlayer(SrsRtcSession* s, int parent_cid);
virtual ~SrsRtcPlayer();
@ -260,6 +262,8 @@ private:
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
void simulate_nack_drop(int nn);
private:
void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes);
};
class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler
@ -278,6 +282,10 @@ private:
private:
SrsRequest* req;
SrsSource* source;
// Whether enabled nack.
bool nack_enabled_;
// Simulators.
int nn_simulate_nack_drop;
private:
std::map<uint32_t, uint64_t> last_sender_report_sys_time;
std::map<uint32_t, SrsNtp> last_sender_report_ntp;
@ -308,6 +316,8 @@ public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
public:
void simulate_nack_drop(int nn);
private:
void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes);
};
class SrsRtcSession

View file

@ -58,8 +58,7 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver()
void SrsRtpNackForReceiver::insert(uint16_t seq)
{
// FIXME: full, drop packet, and request key frame.
SrsRtpNackInfo& nack_info = queue_[seq];
(void)nack_info;
queue_[seq] = SrsRtpNackInfo();
}
void SrsRtpNackForReceiver::remove(uint16_t seq)
@ -88,12 +87,12 @@ void SrsRtpNackForReceiver::check_queue_size()
void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs)
{
srs_utime_t now = srs_update_system_time();
int interval = now - pre_check_time_;
srs_utime_t interval = now - pre_check_time_;
if (interval < opts_.nack_interval / 2) {
return;
}
pre_check_time_ = now;
std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.begin();
while (iter != queue_.end()) {
const uint16_t& seq = iter->first;
@ -128,115 +127,8 @@ void SrsRtpNackForReceiver::update_rtt(int rtt)
opts_.nack_interval = rtt_;
}
SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
SrsRtpQueue::SrsRtpQueue()
{
nn_seq_flip_backs = 0;
begin = end = 0;
capacity_ = (uint16_t)capacity;
initialized_ = false;
queue_ = new SrsRtpPacket2*[capacity_];
memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity);
}
SrsRtpRingBuffer::~SrsRtpRingBuffer()
{
srs_freepa(queue_);
}
bool SrsRtpRingBuffer::empty()
{
return begin == end;
}
int SrsRtpRingBuffer::size()
{
int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
}
void SrsRtpRingBuffer::advance_to(uint16_t seq)
{
begin = seq;
}
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
{
SrsRtpPacket2* p = queue_[at % capacity_];
if (p) {
srs_freep(p);
}
queue_[at % capacity_] = pkt;
}
void SrsRtpRingBuffer::remove(uint16_t at)
{
set(at, NULL);
}
void SrsRtpRingBuffer::reset(uint16_t first, uint16_t last)
{
for (uint16_t s = first; s != last; ++s) {
queue_[s % capacity_] = NULL;
}
}
bool SrsRtpRingBuffer::overflow()
{
return srs_rtp_seq_distance(begin, end) >= capacity_;
}
uint32_t SrsRtpRingBuffer::get_extended_highest_sequence()
{
return nn_seq_flip_backs * 65536 + end - 1;
}
void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last)
{
if (!initialized_) {
initialized_ = true;
begin = seq;
end = seq + 1;
return;
}
// Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end + 1;
nack_last = seq + 1;
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
// TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs;
}
end = seq + 1;
return;
}
// Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher.
srs_warn("too old seq %u, range [%u, %u]", seq, begin, end);
}
}
SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq)
{
return queue_[seq % capacity_];
}
SrsRtpQueue::SrsRtpQueue(int capacity)
{
queue_ = new SrsRtpRingBuffer(capacity);
jitter_ = 0;
last_trans_time_ = -1;
@ -249,61 +141,6 @@ SrsRtpQueue::SrsRtpQueue(int capacity)
SrsRtpQueue::~SrsRtpQueue()
{
srs_freep(queue_);
}
srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Update time for each packet, may hurt performance.
srs_utime_t now = srs_update_system_time();
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = nack->find(seq);
if (nack_info) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
(void)nack_rtt;
nack->remove(seq);
}
// Calc jitter time, ignore nack packets.
// TODO: FIXME: Covert time to srs_utime_t.
if (last_trans_time_ == -1) {
last_trans_time_ = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
} else if (!nack_info) {
int trans_time = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
int cur_jitter = trans_time - last_trans_time_;
if (cur_jitter < 0) {
cur_jitter = -cur_jitter;
}
last_trans_time_ = trans_time;
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
}
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
++num_of_packet_received_;
uint16_t nack_first = 0, nack_last = 0;
queue_->update(seq, nack_first, nack_last);
if (srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
queue_->set(seq, pkt);
return err;
}
uint32_t SrsRtpQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
uint8_t SrsRtpQueue::get_fraction_lost()
@ -330,8 +167,57 @@ uint32_t SrsRtpQueue::get_interarrival_jitter()
return static_cast<uint32_t>(jitter_);
}
srs_error_t SrsRtpQueue::on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Update time for each packet, may hurt performance.
srs_utime_t now = srs_update_system_time();
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if (nack_info) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
(void)nack_rtt;
nack->remove(seq);
}
// Calc jitter time, ignore nack packets.
// TODO: FIXME: Covert time to srs_utime_t.
if (last_trans_time_ == -1) {
last_trans_time_ = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
} else if (!nack_info) {
int trans_time = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
int cur_jitter = trans_time - last_trans_time_;
if (cur_jitter < 0) {
cur_jitter = -cur_jitter;
}
last_trans_time_ = trans_time;
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
}
// OK, got new RTP packet.
if (!nack_info) {
++num_of_packet_received_;
}
return err;
}
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last)
{
if (!nack) {
return;
}
for (uint16_t s = first; s != last; ++s) {
nack->insert(s);
++number_of_packet_lossed_;
@ -340,12 +226,14 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t fi
nack->check_queue_size();
}
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue(capacity)
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity)
{
queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(capacity);
}
SrsRtpAudioQueue::~SrsRtpAudioQueue()
{
srs_freep(queue_);
}
void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq)
@ -365,23 +253,74 @@ void SrsRtpAudioQueue::notify_nack_list_full()
queue_->advance_to(queue_->end);
}
uint32_t SrsRtpAudioQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
queue_->set(seq, pkt);
return err;
}
void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames)
{
// When done, next point to the next available packet.
uint16_t next = queue_->begin;
for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next);
// TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next);
break;
// If nack disabled, we ignore any empty packet.
if (!nack) {
for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next);
if (pkt) {
frames.push_back(pkt);
}
}
} else {
for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next);
frames.push_back(pkt);
// TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next);
break;
}
frames.push_back(pkt);
}
}
// Reap packets from begin to next.
if (next != queue_->begin) {
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->begin, next);
@ -391,18 +330,42 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtp
}
// For audio, if overflow, clear all packets.
// TODO: FIXME: Should notify nack?
if (queue_->overflow()) {
queue_->advance_to(queue_->end);
}
}
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue(capacity)
SrsRtpVideoPacket::SrsRtpVideoPacket()
{
video_is_first_packet = false;
video_is_last_packet = false;
video_is_idr = false;
pkt = NULL;
}
SrsRtpVideoPacket::~SrsRtpVideoPacket()
{
srs_freep(pkt);
}
SrsRtpPacket2* SrsRtpVideoPacket::detach()
{
SrsRtpPacket2* p = pkt;
pkt = NULL;
return p;
}
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity)
{
request_key_frame_ = false;
queue_ = new SrsRtpRingBuffer<SrsRtpVideoPacket*>(capacity);
}
SrsRtpVideoQueue::~SrsRtpVideoQueue()
{
srs_freep(queue_);
}
void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq)
@ -423,36 +386,68 @@ void SrsRtpVideoQueue::notify_nack_list_full()
queue_->advance_to(next);
}
uint32_t SrsRtpVideoQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
SrsRtpVideoPacket* vpkt = new SrsRtpVideoPacket();
vpkt->pkt = pkt;
uint8_t v = (uint8_t)pkt->nalu_type;
if (v == kFuA) {
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
if (!payload) {
srs_freep(pkt);
srs_freep(pkt); srs_freep(vpkt);
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload");
}
pkt->video_is_first_packet = payload->start;
pkt->video_is_last_packet = payload->end;
pkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR);
vpkt->video_is_first_packet = payload->start;
vpkt->video_is_last_packet = payload->end;
vpkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR);
} else {
pkt->video_is_first_packet = true;
pkt->video_is_last_packet = true;
vpkt->video_is_first_packet = true;
vpkt->video_is_last_packet = true;
if (v == kStapA) {
pkt->video_is_idr = true;
vpkt->video_is_idr = true;
} else {
pkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR);
vpkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR);
}
}
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "video consume");
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
srs_freep(pkt); srs_freep(vpkt);
return srs_error_wrap(err, "consume video");
}
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
queue_->set(seq, vpkt);
return err;
}
@ -460,9 +455,7 @@ void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<S
{
while (true) {
SrsRtpPacket2* pkt = NULL;
collect_frame(nack, &pkt);
if (!pkt) {
break;
}
@ -497,7 +490,9 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end);
for (uint16_t s = queue_->begin; s != next; ++s) {
nack->remove(s);
if (nack) {
nack->remove(s);
}
queue_->remove(s);
}
@ -508,34 +503,57 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt)
{
bool found = false;
vector<SrsRtpPacket2*> frame;
vector<SrsRtpVideoPacket*> frame;
// When done, next point to the next available packet.
uint16_t next = queue_->begin;
for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next);
// TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next);
return;
// If nack disabled, we ignore any empty packet.
if (!nack) {
for (; next != queue_->end; ++next) {
SrsRtpVideoPacket* vpkt = queue_->at(next);
if (!vpkt) {
continue;
}
if (frame.empty() && !vpkt->video_is_first_packet) {
continue;
}
frame.push_back(vpkt);
if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) {
found = true;
next++;
break;
}
}
} else {
for (; next != queue_->end; ++next) {
SrsRtpVideoPacket* vpkt = queue_->at(next);
// Ignore when the first packet not the start.
if (next == queue_->begin && !pkt->video_is_first_packet) {
return;
}
// TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame.
if (!vpkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next);
return;
}
// OK, collect packet to frame.
frame.push_back(pkt);
// Ignore when the first packet not the start.
if (frame.empty() && !vpkt->video_is_first_packet) {
return;
}
// Done, we got the last packet of frame.
// @remark Note that the STAP-A is marker false and it's the last packet.
if (pkt->rtp_header.get_marker() || pkt->video_is_last_packet) {
found = true;
next++;
break;
// OK, collect packet to frame.
frame.push_back(vpkt);
// Done, we got the last packet of frame.
// @remark Note that the STAP-A is marker false and it's the last packet.
if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) {
found = true;
next++;
break;
}
}
}
@ -553,38 +571,39 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2*
// Merge packets to one packet.
covert_frame(frame, ppkt);
for (int i = 0; i < (int)frame.size(); i++) {
SrsRtpVideoPacket* pkt = frame[i];
srs_freep(pkt);
}
return;
}
void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt)
void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsRtpPacket2** ppkt)
{
if (frame.size() == 1) {
*ppkt = frame[0];
*ppkt = frame[0]->detach();
return;
}
// If more than one packet in a frame, it must be FU-A.
SrsRtpPacket2* head = frame.at(0);
SrsRtpPacket2* head = frame.at(0)->pkt;
SrsAvcNaluType nalu_type = head->nalu_type;
// Covert FU-A to one RAW RTP packet.
int nn_nalus = 0;
for (size_t i = 0; i < frame.size(); ++i) {
SrsRtpPacket2* pkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
SrsRtpVideoPacket* vpkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(vpkt->pkt->payload);
if (!payload) {
nn_nalus = 0;
break;
nn_nalus = 0; break;
}
nn_nalus += payload->size;
}
// Invalid packets, ignore.
if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) {
for (int i = 0; i < (int)frame.size(); i++) {
SrsRtpPacket2* pkt = frame[i];
srs_freep(pkt);
}
return;
}
@ -606,8 +625,8 @@ void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPa
buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header.
for (size_t i = 0; i < frame.size(); ++i) {
SrsRtpPacket2* pkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
SrsRtpVideoPacket* vpkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(vpkt->pkt->payload);
buf.write_bytes(payload->payload, payload->size);
}
@ -622,8 +641,8 @@ uint16_t SrsRtpVideoQueue::next_start_of_frame(uint16_t seq)
}
for (; s != queue_->end; ++s) {
SrsRtpPacket2* pkt = queue_->at(s);
if (pkt && pkt->video_is_first_packet) {
SrsRtpVideoPacket* vpkt = queue_->at(s);
if (vpkt && vpkt->video_is_first_packet) {
return s;
}
}
@ -636,8 +655,8 @@ uint16_t SrsRtpVideoQueue::next_keyframe()
uint16_t s = queue_->begin + 1;
for (; s != queue_->end; ++s) {
SrsRtpPacket2* pkt = queue_->at(s);
if (pkt && pkt->video_is_idr && pkt->video_is_first_packet) {
SrsRtpVideoPacket* vpkt = queue_->at(s);
if (vpkt && vpkt->video_is_idr && vpkt->video_is_first_packet) {
return s;
}
}

View file

@ -113,13 +113,14 @@ public:
// but not an entire video frame right now.
// * seq10: This packet is lost or not received, we put it in the nack list.
// We store the received packets in ring buffer.
template<typename T>
class SrsRtpRingBuffer
{
private:
// Capacity of the ring-buffer.
uint16_t capacity_;
// Ring bufer.
SrsRtpPacket2** queue_;
T* queue_;
// Increase one when uint16 flip back, for get_extended_highest_sequence.
uint64_t nn_seq_flip_backs;
// Whether initialized, because we use uint16 so we can't use -1.
@ -132,28 +133,100 @@ public:
// For example, when got 1 elems, the end is 1.
uint16_t end;
public:
SrsRtpRingBuffer(int capacity);
virtual ~SrsRtpRingBuffer();
SrsRtpRingBuffer(int capacity) {
nn_seq_flip_backs = 0;
begin = end = 0;
capacity_ = (uint16_t)capacity;
initialized_ = false;
queue_ = new T[capacity_];
memset(queue_, 0, sizeof(T) * capacity);
}
virtual ~SrsRtpRingBuffer() {
srs_freepa(queue_);
}
public:
// Whether the ring buffer is empty.
bool empty();
bool empty() {
return begin == end;
}
// Get the count of elems in ring buffer.
int size();
int size() {
int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
}
// Move the low position of buffer to seq.
void advance_to(uint16_t seq);
void advance_to(uint16_t seq) {
begin = seq;
}
// Free the packet at position.
void set(uint16_t at, SrsRtpPacket2* pkt);
void remove(uint16_t at);
void set(uint16_t at, T pkt) {
T p = queue_[at % capacity_];
if (p) {
srs_freep(p);
}
queue_[at % capacity_] = pkt;
}
void remove(uint16_t at) {
set(at, NULL);
}
// Directly reset range [first, last) to NULL.
void reset(uint16_t first, uint16_t last);
void reset(uint16_t first, uint16_t last) {
for (uint16_t s = first; s != last; ++s) {
queue_[s % capacity_] = NULL;
}
}
// Whether queue overflow or heavy(too many packets and need clear).
bool overflow();
bool overflow() {
return srs_rtp_seq_distance(begin, end) >= capacity_;
}
// The highest sequence number, calculate the flip back base.
uint32_t get_extended_highest_sequence();
uint32_t get_extended_highest_sequence() {
return nn_seq_flip_backs * 65536 + end - 1;
}
// Update the sequence, got the nack range by [first, last).
void update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last);
// @return If false, the seq is too old.
bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) {
if (!initialized_) {
initialized_ = true;
begin = seq;
end = seq + 1;
return true;
}
// Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end;
nack_last = seq;
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
// TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs;
}
end = seq + 1;
return true;
}
// Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher.
return false;
}
return true;
}
// Get the packet by seq.
SrsRtpPacket2* at(uint16_t seq);
T at(uint16_t seq) {
return queue_[seq % capacity_];
}
};
class SrsRtpQueue
@ -166,45 +239,65 @@ private:
uint64_t pre_number_of_packet_lossed_;
uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_;
protected:
SrsRtpRingBuffer* queue_;
public:
SrsRtpQueue(int capacity);
SrsRtpQueue();
virtual ~SrsRtpQueue();
public:
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void notify_drop_seq(uint16_t seq) = 0;
virtual void notify_nack_list_full() = 0;
public:
uint32_t get_extended_highest_sequence();
virtual uint32_t get_extended_highest_sequence() = 0;
uint8_t get_fraction_lost();
uint32_t get_cumulative_number_of_packets_lost();
uint32_t get_interarrival_jitter();
private:
protected:
srs_error_t on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last);
};
class SrsRtpAudioQueue : public SrsRtpQueue
{
private:
SrsRtpRingBuffer<SrsRtpPacket2*>* queue_;
public:
SrsRtpAudioQueue(int capacity);
virtual ~SrsRtpAudioQueue();
public:
virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full();
virtual uint32_t get_extended_highest_sequence();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
public:
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames);
};
class SrsRtpVideoPacket
{
public:
// Helper information for video decoder only.
bool video_is_first_packet;
bool video_is_last_packet;
bool video_is_idr;
public:
SrsRtpPacket2* pkt;
public:
SrsRtpVideoPacket();
virtual ~SrsRtpVideoPacket();
public:
SrsRtpPacket2* detach();
};
class SrsRtpVideoQueue : public SrsRtpQueue
{
private:
bool request_key_frame_;
SrsRtpRingBuffer<SrsRtpVideoPacket*>* queue_;
public:
SrsRtpVideoQueue(int capacity);
virtual ~SrsRtpVideoQueue();
public:
virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full();
virtual uint32_t get_extended_highest_sequence();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame);
bool should_request_key_frame();
@ -212,7 +305,7 @@ public:
private:
virtual void on_overflow(SrsRtpNackForReceiver* nack);
virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt);
virtual void covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt);
virtual void covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsRtpPacket2** ppkt);
uint16_t next_start_of_frame(uint16_t seq);
uint16_t next_keyframe();
};

View file

@ -277,17 +277,12 @@ SrsRtpPacket2::SrsRtpPacket2()
payload = NULL;
decode_handler = NULL;
video_is_first_packet = false;
video_is_last_packet = false;
video_is_idr = false;
nalu_type = SrsAvcNaluTypeReserved;
original_bytes = NULL;
cache_raw = new SrsRtpRawPayload();
cache_fua = new SrsRtpFUAPayload2();
cache_payload = 0;
original_bytes = NULL;
nn_original_payload = 0;
}
SrsRtpPacket2::~SrsRtpPacket2()
@ -408,7 +403,6 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf)
// If user set the decode handler, call it to set the payload.
if (decode_handler) {
decode_handler->on_before_decode_payload(this, buf, &payload);
nn_original_payload = buf->left();
}
// By default, we always use the RAW payload.

View file

@ -114,15 +114,8 @@ public:
int padding;
// Decoder helper.
public:
// TODO: FIXME: Move to video decoder queue SrsRtpVideoQueue.
// Helper information for video decoder only.
bool video_is_first_packet;
bool video_is_last_packet;
bool video_is_idr;
// The first byte as nalu type, for video decoder only.
SrsAvcNaluType nalu_type;
// The original payload bytes length.
int nn_original_payload;
// The original bytes for decoder only, we will free it.
char* original_bytes;
// Fast cache for performance.

View file

@ -85,6 +85,9 @@ srs_error_t do_main(int argc, char** argv)
// TODO: support both little and big endian.
srs_assert(srs_is_little_endian());
// For RTC to generating random ICE username.
::srandom((unsigned long)(srs_update_system_time() | (::getpid()<<13)));
// for gperf gmp or gcp,
// should never enable it when not enabled for performance issue.

View file

@ -1323,7 +1323,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesString)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::str();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -1354,7 +1354,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesString)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::str();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
@ -1452,7 +1452,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesBoolean)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::boolean();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -1467,7 +1467,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesBoolean)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::boolean();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
@ -1549,7 +1549,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesNumber)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::number();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -1564,7 +1564,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesNumber)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::number();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
@ -1673,14 +1673,14 @@ VOID TEST(ProtocolAMF0Test, InterfacesNull)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::null();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::null();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
@ -1739,14 +1739,14 @@ VOID TEST(ProtocolAMF0Test, InterfacesUndefined)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::undefined();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* o = SrsAmf0Any::undefined();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
@ -1811,14 +1811,14 @@ VOID TEST(ProtocolAMF0Test, InterfacesObject)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Object* o = SrsAmf0Any::object();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Object* o = SrsAmf0Any::object();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -2014,14 +2014,14 @@ VOID TEST(ProtocolAMF0Test, InterfacesObjectEOF)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0ObjectEOF* o = new SrsAmf0ObjectEOF();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0ObjectEOF* o = new SrsAmf0ObjectEOF();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -2106,14 +2106,14 @@ VOID TEST(ProtocolAMF0Test, InterfacesEcmaArray)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0EcmaArray* o = SrsAmf0Any::ecma_array();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0EcmaArray* o = SrsAmf0Any::ecma_array();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -2233,14 +2233,14 @@ VOID TEST(ProtocolAMF0Test, InterfacesStrictArray)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0StrictArray* o = SrsAmf0Any::strict_array();
HELPER_EXPECT_FAILED(o->read(&b));
srs_freep(o);
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0StrictArray* o = SrsAmf0Any::strict_array();
HELPER_EXPECT_FAILED(o->write(&b));
srs_freep(o);
@ -2346,7 +2346,7 @@ VOID TEST(ProtocolAMF0Test, InterfacesError)
srs_error_t err;
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
HELPER_EXPECT_FAILED(SrsAmf0Any::discovery(&b, NULL));
}
@ -2490,7 +2490,7 @@ VOID TEST(ProtocolAMF0Test, Amf0Object2)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsAmf0Any* eof = SrsAmf0Any::object_eof();
HELPER_EXPECT_FAILED(srs_amf0_write_object_eof(&b, (SrsAmf0ObjectEOF*)eof));
srs_freep(eof);

View file

@ -2596,7 +2596,7 @@ VOID TEST(KernelUtility, AnnexbUtils)
if (true) {
EXPECT_TRUE(!srs_avc_startswith_annexb(NULL, NULL));
SrsBuffer buf;
SrsBuffer buf(NULL, 0);
EXPECT_TRUE(!srs_avc_startswith_annexb(&buf, NULL));
}
@ -2654,7 +2654,7 @@ VOID TEST(KernelUtility, AdtsUtils)
if (true) {
EXPECT_TRUE(!srs_aac_startswith_adts(NULL));
SrsBuffer buf;
SrsBuffer buf(NULL, 0);
EXPECT_TRUE(!srs_aac_startswith_adts(&buf));
}
@ -4506,7 +4506,7 @@ VOID TEST(KernelTSTest, CoverContextUtility)
SrsTsMessage m(&c, &p);
m.PES_packet_length = 8;
SrsBuffer b;
SrsBuffer b(NULL, 0);
int nb_bytes = 0;
HELPER_EXPECT_SUCCESS(m.dump(&b, &nb_bytes));
@ -4625,7 +4625,7 @@ VOID TEST(KernelTSTest, CoverContextEncode)
MockTsHandler h;
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
HELPER_EXPECT_SUCCESS(ctx.decode(&b, &h));
EXPECT_TRUE(NULL == h.msg);
}

View file

@ -220,7 +220,7 @@ VOID TEST(KernelMp4Test, DiscoveryBox)
SrsMp4Box* pbox;
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
HELPER_ASSERT_FAILED(SrsMp4Box::discovery(&b, &pbox));
}
@ -419,7 +419,7 @@ VOID TEST(KernelMp4Test, UUIDBoxDecode)
}
if (true) {
SrsBuffer b;
SrsBuffer b(NULL, 0);
SrsMp4Box box;
HELPER_ASSERT_FAILED(box.decode(&b));
}

View file

@ -92,7 +92,7 @@ VOID TEST(ProtocolRTMPTest, PacketEncode)
MockPacket pkt;
pkt.size = 1024;
SrsBuffer b;
SrsBuffer b(NULL, 0);
HELPER_EXPECT_FAILED(pkt.decode(&b));
}