1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 20:01:56 +00:00

RTC: Refine RTCP packets parsing

This commit is contained in:
winlin 2020-08-11 09:40:27 +08:00
parent 1591318792
commit a728e02b93
5 changed files with 131 additions and 464 deletions

View file

@ -498,12 +498,12 @@ srs_error_t SrsRtcPlayStream::cycle()
// Send-out all RTP packets and do cleanup // Send-out all RTP packets and do cleanup
if (true) { if (true) {
err = send_packets(source, pkts, info); if ((err = send_packets(source, pkts, info)) != srs_success) {
if (epp.can_print(err)) {
if (epp.can_print(err)) { srs_warn("play send packets=%u, err: %s", pkts.size(), srs_error_desc(err).c_str());
srs_warn("play send packets=%u, err: %s", pkts.size(), srs_error_desc(err).c_str()); }
srs_freep(err);
} }
srs_freep(err);
for (int i = 0; i < msg_count; i++) { for (int i = 0; i < msg_count; i++) {
SrsRtpPacket2* pkt = pkts[i]; SrsRtpPacket2* pkt = pkts[i];
@ -664,75 +664,27 @@ srs_error_t SrsRtcPlayStream::notify(int type, srs_utime_t interval, srs_utime_t
return err; return err;
} }
srs_error_t SrsRtcPlayStream::on_rtcp(char* data, int nb_data) srs_error_t SrsRtcPlayStream::on_rtcp(SrsRtcpCommon* rtcp)
{ {
srs_error_t err = srs_success; if(SrsRtcpType_rr == rtcp->type()) {
SrsRtcpRR* rr = dynamic_cast<SrsRtcpRR*>(rtcp);
// TODO: Use SrsBuffer to parse it. return on_rtcp_rr(rr);
char* ph = data; } else if(SrsRtcpType_rtpfb == rtcp->type()) {
int nb_left = nb_data; //currently rtpfb of nack will be handle by player. TWCC will be handled by SrsRtcConnection
SrsRtcpNack* nack = dynamic_cast<SrsRtcpNack*>(rtcp);
while (nb_left) { return on_rtcp_nack(nack);
uint8_t payload_type = ph[1]; } else if(SrsRtcpType_psfb == rtcp->type()) {
uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; SrsRtcpPsfbCommon* psfb = dynamic_cast<SrsRtcpPsfbCommon*>(rtcp);
return on_rtcp_ps_feedback(psfb);
int length = (length_4bytes + 1) * 4; } else if(SrsRtcpType_xr == rtcp->type()) {
SrsRtcpXr* xr = dynamic_cast<SrsRtcpXr*>(rtcp);
if (length > nb_data) { return on_rtcp_xr(xr);
return srs_error_new(ERROR_RTC_RTCP, "invalid length=%u/%u, left=%u, bytes=%s", } else {
length, nb_data, nb_left, srs_string_dumps_hex(ph, nb_left, 8).c_str()); return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", rtcp->type());
}
srs_verbose("on rtcp, payload_type=%u", payload_type);
switch (payload_type) {
case kSR: {
err = on_rtcp_sr(ph, length);
break;
}
case kRR: {
err = on_rtcp_rr(ph, length);
break;
}
case kSDES: {
break;
}
case kBye: {
break;
}
case kApp: {
break;
}
case kRtpFb: {
err = on_rtcp_feedback(ph, length);
break;
}
case kPsFb: {
err = on_rtcp_ps_feedback(ph, length);
break;
}
case kXR: {
err = on_rtcp_xr(ph, length);
break;
}
default:{
return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type);
break;
}
}
if (err != srs_success) {
return srs_error_wrap(err, "rtcp left=%u, bytes=%s", nb_left, srs_string_dumps_hex(ph, nb_left, 8).c_str());
}
ph += length;
nb_left -= length;
} }
return err;
} }
srs_error_t SrsRtcPlayStream::on_rtcp_sr(char* buf, int nb_buf) srs_error_t SrsRtcPlayStream::on_rtcp_rr(SrsRtcpRR* rtcp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -743,7 +695,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_sr(char* buf, int nb_buf)
return err; return err;
} }
srs_error_t SrsRtcPlayStream::on_rtcp_xr(char* buf, int nb_buf) srs_error_t SrsRtcPlayStream::on_rtcp_xr(SrsRtcpXr* rtcp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -754,73 +706,24 @@ srs_error_t SrsRtcPlayStream::on_rtcp_xr(char* buf, int nb_buf)
return err; return err;
} }
srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if (nb_buf < 12) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf);
}
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsAutoFree(SrsBuffer, stream);
// @see: https://tools.ietf.org/html/rfc4585#section-6.1
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| FMT | PT | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of packet sender |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of media source |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
: Feedback Control Information (FCI) :
: :
*/
uint8_t first = stream->read_1bytes();
//uint8_t version = first & 0xC0;
//uint8_t padding = first & 0x20;
uint8_t fmt = first & 0x1F;
if(15 == fmt) {
return session_->on_rtcp_feedback(buf, nb_buf);
}
/*uint8_t payload_type = */stream->read_1bytes();
/*uint16_t length = */stream->read_2bytes();
/*uint32_t ssrc_of_sender = */stream->read_4bytes();
uint32_t ssrc_of_media_source = stream->read_4bytes();
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PID | BLP |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
uint16_t pid = stream->read_2bytes();
int blp = stream->read_2bytes();
// TODO: FIXME: Support ARQ.
vector<SrsRtpPacket2*> resend_pkts;
nack_fetch(resend_pkts, ssrc_of_media_source, pid);
// If NACK disabled, print a log. // If NACK disabled, print a log.
if (!nack_enabled_) { if (!nack_enabled_) {
srs_trace("RTC NACK seq=%u, ignored", pid); vector<uint16_t> sns = rtcp->get_lost_sns();
srs_trace("RTC NACK ssrc=%u, seq=%s, ignored", rtcp->get_media_ssrc(), srs_join_vector_string(sns, ",").c_str());
return err; return err;
} }
uint16_t mask = 0x01; // TODO: FIXME: Support ARQ.
for (int i = 1; i < 16 && blp; ++i, mask <<= 1) { vector<SrsRtpPacket2*> resend_pkts;
if (!(blp & mask)) {
continue;
}
uint32_t loss_seq = pid + i; vector<uint16_t> sns = rtcp->get_lost_sns();
nack_fetch(resend_pkts, ssrc_of_media_source, loss_seq); for(int i = 0; i < sns.size(); ++i) {
uint16_t seq = sns.at(i);
nack_fetch(resend_pkts, rtcp->get_media_ssrc(), seq);
} }
for (int i = 0; i < (int)resend_pkts.size(); ++i) { for (int i = 0; i < (int)resend_pkts.size(); ++i) {
@ -840,32 +743,16 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf)
return err; return err;
} }
srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf) srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if (nb_buf < 12) { uint8_t fmt = rtcp->get_rc();
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf);
}
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsAutoFree(SrsBuffer, stream);
uint8_t first = stream->read_1bytes();
//uint8_t version = first & 0xC0;
//uint8_t padding = first & 0x20;
uint8_t fmt = first & 0x1F;
/*uint8_t payload_type = */stream->read_1bytes();
/*uint16_t length = */stream->read_2bytes();
/*uint32_t ssrc_of_sender = */stream->read_4bytes();
uint32_t ssrc_of_media_source = stream->read_4bytes();
switch (fmt) { switch (fmt) {
case kPLI: { case kPLI: {
ISrsRtcPublishStream* publisher = source_->publish_stream(); ISrsRtcPublishStream* publisher = source_->publish_stream();
if (publisher) { if (publisher) {
uint32_t ssrc = get_video_publish_ssrc(ssrc_of_media_source); uint32_t ssrc = get_video_publish_ssrc(rtcp->get_media_ssrc());
if (ssrc != 0) { if (ssrc != 0) {
publisher->request_keyframe(ssrc); publisher->request_keyframe(ssrc);
srs_trace("RTC request PLI"); srs_trace("RTC request PLI");
@ -895,17 +782,6 @@ srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf)
return err; return err;
} }
srs_error_t SrsRtcPlayStream::on_rtcp_rr(char* data, int nb_data)
{
srs_error_t err = srs_success;
// TODO: FIXME: Implements it.
session_->stat_->nn_rr++;
return err;
}
uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc) uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc)
{ {
std::map<uint32_t, SrsRtcVideoSendTrack*>::iterator it; std::map<uint32_t, SrsRtcVideoSendTrack*>::iterator it;
@ -1279,162 +1155,33 @@ srs_error_t SrsRtcPublishStream::send_periodic_twcc()
return session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); return session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
} }
srs_error_t SrsRtcPublishStream::on_rtcp(char* data, int nb_data) srs_error_t SrsRtcPublishStream::on_rtcp(SrsRtcpCommon* rtcp)
{ {
srs_error_t err = srs_success; if(SrsRtcpType_sr == rtcp->type()) {
SrsRtcpSR* sr = dynamic_cast<SrsRtcpSR*>(rtcp);
char* ph = data; return on_rtcp_sr(sr);
int nb_left = nb_data; } else if(SrsRtcpType_xr == rtcp->type()) {
while (nb_left) { SrsRtcpXr* xr = dynamic_cast<SrsRtcpXr*>(rtcp);
uint8_t payload_type = ph[1]; return on_rtcp_xr(xr);
uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; } else {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", rtcp->type());
int length = (length_4bytes + 1) * 4;
if (length > nb_data) {
return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length);
}
srs_verbose("on rtcp, payload_type=%u", payload_type);
switch (payload_type) {
case kSR: {
err = on_rtcp_sr(ph, length);
break;
}
case kRR: {
err = on_rtcp_rr(ph, length);
break;
}
case kSDES: {
break;
}
case kBye: {
break;
}
case kApp: {
break;
}
case kRtpFb: {
err = on_rtcp_feedback(ph, length);
break;
}
case kPsFb: {
err = on_rtcp_ps_feedback(ph, length);
break;
}
case kXR: {
err = on_rtcp_xr(ph, length);
break;
}
default:{
return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type);
break;
}
}
if (err != srs_success) {
return srs_error_wrap(err, "rtcp");
}
ph += length;
nb_left -= length;
} }
return err;
} }
srs_error_t SrsRtcPublishStream::on_rtcp_sr(char* buf, int nb_buf) srs_error_t SrsRtcPublishStream::on_rtcp_sr(SrsRtcpSR* rtcp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsNtp srs_ntp = SrsNtp::to_time_ms(rtcp->get_ntp());
if (nb_buf < 28) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp sender report packet, nb_buf=%d", nb_buf);
}
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsAutoFree(SrsBuffer, stream);
// @see: https://tools.ietf.org/html/rfc3550#section-6.4.1
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
header |V=2|P| RC | PT=SR=200 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of sender |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
sender | NTP timestamp, most significant word |
info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NTP timestamp, least significant word |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| RTP timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| sender's packet count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| sender's octet count |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
report | SSRC_1 (SSRC of first source) |
block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1 | fraction lost | cumulative number of packets lost |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| extended highest sequence number received |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| interarrival jitter |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| last SR (LSR) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| delay since last SR (DLSR) |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
report | SSRC_2 (SSRC of second source) |
block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2 : ... :
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| profile-specific extensions |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
uint8_t first = stream->read_1bytes();
uint8_t rc = first & 0x1F;
uint8_t payload_type = stream->read_1bytes();
srs_assert(payload_type == kSR);
uint16_t length = stream->read_2bytes();
if (((length + 1) * 4) != (rc * 24 + 28)) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtcp sender report packet, length=%u, rc=%u", length, rc);
}
uint32_t ssrc_of_sender = stream->read_4bytes();
uint64_t ntp = stream->read_8bytes();
SrsNtp srs_ntp = SrsNtp::to_time_ms(ntp);
uint32_t rtp_time = stream->read_4bytes();
uint32_t sender_packet_count = stream->read_4bytes();
uint32_t sender_octec_count = stream->read_4bytes();
(void)sender_packet_count; (void)sender_octec_count; (void)rtp_time;
srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u", srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u",
ssrc_of_sender, rtp_time, sender_packet_count, sender_octec_count); rtcp->get_ssrc(), rtcp->get_rtp_ts(), rtcp->get_rtp_send_packets(), rtcp->get_rtp_send_bytes());
for (int i = 0; i < rc; ++i) { update_send_report_time(rtcp->get_ssrc(), srs_ntp);
uint32_t ssrc = stream->read_4bytes();
uint8_t fraction_lost = stream->read_1bytes();
uint32_t cumulative_number_of_packets_lost = stream->read_3bytes();
uint32_t highest_seq = stream->read_4bytes();
uint32_t jitter = stream->read_4bytes();
uint32_t lst = stream->read_4bytes();
uint32_t dlsr = stream->read_4bytes();
(void)ssrc; (void)fraction_lost; (void)cumulative_number_of_packets_lost; (void)highest_seq; (void)jitter; (void)lst; (void)dlsr;
srs_verbose("sender report, ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u",
ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr);
}
update_send_report_time(ssrc_of_sender, srs_ntp);
return err; return err;
} }
srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) srs_error_t SrsRtcPublishStream::on_rtcp_xr(SrsRtcpXr* rtcp)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1452,15 +1199,15 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf)
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/ */
SrsBuffer stream(buf, nb_buf); SrsBuffer stream(rtcp->data(), rtcp->size());
/*uint8_t first = */stream.read_1bytes(); /*uint8_t first = */stream.read_1bytes();
uint8_t pt = stream.read_1bytes(); uint8_t pt = stream.read_1bytes();
srs_assert(pt == kXR); srs_assert(pt == kXR);
uint16_t length = (stream.read_2bytes() + 1) * 4; uint16_t length = (stream.read_2bytes() + 1) * 4;
/*uint32_t ssrc = */stream.read_4bytes(); /*uint32_t ssrc = */stream.read_4bytes();
if (length != nb_buf) { if (length != rtcp->size()) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, nb_buf); return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, rtcp->size());
} }
while (stream.pos() + 4 < length) { while (stream.pos() + 4 < length) {
@ -1468,8 +1215,8 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf)
stream.skip(1); stream.skip(1);
uint16_t block_length = (stream.read_2bytes() + 1) * 4; uint16_t block_length = (stream.read_2bytes() + 1) * 4;
if (stream.pos() + block_length - 4 > nb_buf) { if (stream.pos() + block_length - 4 > rtcp->size()) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet block, block_length=%u, nb_buf=%d", block_length, nb_buf); return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet block, block_length=%u, nb_buf=%d", block_length, rtcp->size());
} }
if (bt == 5) { if (bt == 5) {
@ -1494,128 +1241,6 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf)
return err; return err;
} }
srs_error_t SrsRtcPublishStream::on_rtcp_feedback(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
// TODO: FIXME: Implements it.
return err;
}
srs_error_t SrsRtcPublishStream::on_rtcp_ps_feedback(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
if (nb_buf < 12) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf);
}
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsAutoFree(SrsBuffer, stream);
uint8_t first = stream->read_1bytes();
//uint8_t version = first & 0xC0;
//uint8_t padding = first & 0x20;
uint8_t fmt = first & 0x1F;
/*uint8_t payload_type = */stream->read_1bytes();
/*uint16_t length = */stream->read_2bytes();
/*uint32_t ssrc_of_sender = */stream->read_4bytes();
/*uint32_t ssrc_of_media_source = */stream->read_4bytes();
switch (fmt) {
case kPLI: {
srs_verbose("pli");
break;
}
case kSLI: {
srs_verbose("sli");
break;
}
case kRPSI: {
srs_verbose("rpsi");
break;
}
case kAFB: {
srs_verbose("afb");
break;
}
default: {
return srs_error_new(ERROR_RTC_RTCP, "unknown payload specific feedback=%u", fmt);
}
}
return err;
}
srs_error_t SrsRtcPublishStream::on_rtcp_rr(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
if (nb_buf < 8) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp receiver report packet, nb_buf=%d", nb_buf);
}
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsAutoFree(SrsBuffer, stream);
// @see: https://tools.ietf.org/html/rfc3550#section-6.4.2
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
header |V=2|P| RC | PT=RR=201 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of packet sender |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
report | SSRC_1 (SSRC of first source) |
block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1 | fraction lost | cumulative number of packets lost |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| extended highest sequence number received |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| interarrival jitter |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| last SR (LSR) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| delay since last SR (DLSR) |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
report | SSRC_2 (SSRC of second source) |
block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2 : ... :
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| profile-specific extensions |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
uint8_t first = stream->read_1bytes();
//uint8_t version = first & 0xC0;
//uint8_t padding = first & 0x20;
uint8_t rc = first & 0x1F;
/*uint8_t payload_type = */stream->read_1bytes();
uint16_t length = stream->read_2bytes();
/*uint32_t ssrc_of_sender = */stream->read_4bytes();
if (((length + 1) * 4) != (rc * 24 + 8)) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtcp receiver packet, length=%u, rc=%u", length, rc);
}
for (int i = 0; i < rc; ++i) {
uint32_t ssrc = stream->read_4bytes();
uint8_t fraction_lost = stream->read_1bytes();
uint32_t cumulative_number_of_packets_lost = stream->read_3bytes();
uint32_t highest_seq = stream->read_4bytes();
uint32_t jitter = stream->read_4bytes();
uint32_t lst = stream->read_4bytes();
uint32_t dlsr = stream->read_4bytes();
(void)ssrc; (void)fraction_lost; (void)cumulative_number_of_packets_lost; (void)highest_seq; (void)jitter; (void)lst; (void)dlsr;
srs_verbose("ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u",
ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr);
}
return err;
}
// TODO: FIXME: Use async request PLI to prevent dup requests. // TODO: FIXME: Use async request PLI to prevent dup requests.
void SrsRtcPublishStream::request_keyframe(uint32_t ssrc) void SrsRtcPublishStream::request_keyframe(uint32_t ssrc)
{ {
@ -2071,17 +1696,59 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
} }
if (player_) { SrsBuffer* buffer = new SrsBuffer(unprotected_buf, nb_unprotected_buf);
err = player_->on_rtcp(unprotected_buf, nb_unprotected_buf); SrsAutoFree(SrsBuffer, buffer);
SrsRtcpCompound rtcp_compound;
if(srs_success != (err = rtcp_compound.decode(buffer))) {
return srs_error_wrap(err, "decode rtcp plaintext=%u, bytes=%s, at=%s", nb_unprotected_buf,
srs_string_dumps_hex(unprotected_buf, nb_unprotected_buf, 8).c_str(),
srs_string_dumps_hex(buffer->head(), buffer->left(), 8).c_str());
} }
if (publisher_) { SrsRtcpCommon* rtcp = NULL;
err = publisher_->on_rtcp(unprotected_buf, nb_unprotected_buf); while(NULL != (rtcp = rtcp_compound.get_next_rtcp())) {
err = dispatch_rtcp(rtcp);
srs_freep(rtcp);
if(srs_success != err) {
return srs_error_wrap(err, "cipher=%u, plaintext=%u, bytes=%s, rtcp=(%u,%u,%u,%u)", nb_data, nb_unprotected_buf,
srs_string_dumps_hex(unprotected_buf, nb_unprotected_buf, 8).c_str(),
rtcp->get_rc(), rtcp->type(), rtcp->get_ssrc(), rtcp->size());
}
} }
if (err != srs_success) { return err;
return srs_error_wrap(err, "cipher=%u, plaintext=%u, bytes=%s", nb_data, nb_unprotected_buf, }
srs_string_dumps_hex(unprotected_buf, nb_unprotected_buf, 8).c_str());
srs_error_t SrsRtcConnection::dispatch_rtcp(SrsRtcpCommon* rtcp)
{
srs_error_t err = srs_success;
if(SrsRtcpType_sr == rtcp->type()) {
return publisher_->on_rtcp(rtcp);
} else if(SrsRtcpType_rr == rtcp->type()) {
SrsRtcpRR* rr = dynamic_cast<SrsRtcpRR*>(rtcp);
if (rr->get_rb_ssrc()) {
return player_->on_rtcp(rtcp);
}
} else if(SrsRtcpType_rtpfb == rtcp->type()) {
if(1 == rtcp->get_rc()) {
//nack
return player_->on_rtcp(rtcp);
} else if(15 == rtcp->get_rc()) {
// twcc
return on_rtcp_feedback(rtcp->data(), rtcp->size());
}
} else if(SrsRtcpType_psfb == rtcp->type()) {
return player_->on_rtcp(rtcp);
} else {
if (player_) {
return player_->on_rtcp(rtcp);
}
if (publisher_) {
return publisher_->on_rtcp(rtcp);
}
} }
return err; return err;

View file

@ -250,21 +250,20 @@ public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
private: private:
srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info); srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info);
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq); void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
public:
// Directly set the status of track, generally for init to set the default value. // Directly set the status of track, generally for init to set the default value.
void set_all_tracks_status(bool status); void set_all_tracks_status(bool status);
// interface ISrsHourGlass // interface ISrsHourGlass
public: public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
public: public:
srs_error_t on_rtcp(char* data, int nb_data); srs_error_t on_rtcp(SrsRtcpCommon* rtcp);
private: private:
srs_error_t on_rtcp_sr(char* buf, int nb_buf); srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp);
srs_error_t on_rtcp_xr(char* buf, int nb_buf); srs_error_t on_rtcp_nack(SrsRtcpNack* rtcp);
srs_error_t on_rtcp_feedback(char* data, int nb_data); srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp);
srs_error_t on_rtcp_ps_feedback(char* data, int nb_data); srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp);
srs_error_t on_rtcp_rr(char* data, int nb_data);
uint32_t get_video_publish_ssrc(uint32_t play_ssrc); uint32_t get_video_publish_ssrc(uint32_t play_ssrc);
}; };
@ -316,13 +315,10 @@ public:
private: private:
srs_error_t send_periodic_twcc(); srs_error_t send_periodic_twcc();
public: public:
srs_error_t on_rtcp(char* data, int nb_data); srs_error_t on_rtcp(SrsRtcpCommon* rtcp);
private: private:
srs_error_t on_rtcp_sr(char* buf, int nb_buf); srs_error_t on_rtcp_sr(SrsRtcpSR* rtcp);
srs_error_t on_rtcp_xr(char* buf, int nb_buf); srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp);
srs_error_t on_rtcp_feedback(char* data, int nb_data);
srs_error_t on_rtcp_ps_feedback(char* data, int nb_data);
srs_error_t on_rtcp_rr(char* data, int nb_data);
public: public:
void request_keyframe(uint32_t ssrc); void request_keyframe(uint32_t ssrc);
// interface ISrsHourGlass // interface ISrsHourGlass
@ -443,6 +439,9 @@ public:
srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data);
srs_error_t on_rtcp(char* data, int nb_data); srs_error_t on_rtcp(char* data, int nb_data);
private:
srs_error_t dispatch_rtcp(SrsRtcpCommon* rtcp);
public:
srs_error_t on_rtcp_feedback(char* buf, int nb_buf); srs_error_t on_rtcp_feedback(char* buf, int nb_buf);
void set_hijacker(ISrsRtcConnectionHijacker* h); void set_hijacker(ISrsRtcConnectionHijacker* h);
public: public:

View file

@ -427,6 +427,7 @@ public:
SrsRtcpCompound(); SrsRtcpCompound();
virtual ~SrsRtcpCompound(); virtual ~SrsRtcpCompound();
// TODO: FIXME: Should rename it to pop(), because it's not a GET method.
SrsRtcpCommon* get_next_rtcp(); SrsRtcpCommon* get_next_rtcp();
srs_error_t add_rtcp(SrsRtcpCommon *rtcp); srs_error_t add_rtcp(SrsRtcpCommon *rtcp);
void clear(); void clear();

View file

@ -379,20 +379,6 @@ srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int s
return err; return err;
} }
string srs_join_vector_string(vector<string>& vs, string separator)
{
string str = "";
for (int i = 0; i < (int)vs.size(); i++) {
str += vs.at(i);
if (i != (int)vs.size() - 1) {
str += separator;
}
}
return str;
}
bool srs_is_ipv4(string domain) bool srs_is_ipv4(string domain)
{ {
for (int i = 0; i < (int)domain.length(); i++) { for (int i = 0; i < (int)domain.length(); i++) {

View file

@ -34,6 +34,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <map> #include <map>
#include <sstream>
#include <srs_kernel_consts.hpp> #include <srs_kernel_consts.hpp>
@ -108,7 +109,20 @@ extern std::string srs_generate_rtmp_url(std::string server, int port, std::stri
extern srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL); extern srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL);
// join string in vector with indicated separator // join string in vector with indicated separator
extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator); template <typename T>
std::string srs_join_vector_string(std::vector<T>& vs, std::string separator)
{
std::stringstream ss;
for (int i = 0; i < (int)vs.size(); i++) {
ss << vs.at(i);
if (i != (int)vs.size() - 1) {
ss << separator;
}
}
return ss.str();
}
// Whether domain is an IPv4 address. // Whether domain is an IPv4 address.
extern bool srs_is_ipv4(std::string domain); extern bool srs_is_ipv4(std::string domain);