From be1b0bf941c938688fe2726fce3888379971d401 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=98=E7=AF=B1?= Date: Fri, 15 May 2020 18:06:25 +0800 Subject: [PATCH] RTC: Refactor code --- trunk/src/kernel/srs_kernel_rtc_rtcp.cpp | 250 ++++++++++++----------- 1 file changed, 128 insertions(+), 122 deletions(-) diff --git a/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp index 5a5d982bf..4b3ad81a7 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp @@ -40,6 +40,7 @@ srs_error_t SrsRtcpCommon::decode_header(SrsBuffer *buffer) { buffer->read_bytes((char*)(&header_), sizeof(SrsRtcpHeader)); header_.length = ntohs(header_.length); + return srs_success; } @@ -47,19 +48,22 @@ srs_error_t SrsRtcpCommon::encode_header(SrsBuffer *buffer) { header_.length = htons(header_.length); buffer->write_bytes((char*)(&header_), sizeof(SrsRtcpHeader)); + return srs_success; } srs_error_t SrsRtcpCommon::decode(SrsBuffer *buffer) { srs_error_t err = srs_success; - err = decode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to parse rtcp header"); + + if(srs_success != (err = decode_header(buffer))) { + return srs_error_wrap(err, "decode header"); } + payload_len_ = (header_.length + 1) * 4 - sizeof(SrsRtcpHeader); buffer->read_bytes((char *)payload_, payload_len_); - return srs_success; + + return err; } int SrsRtcpCommon::nb_bytes() @@ -99,35 +103,42 @@ const srs_error_t SrsRtcpApp::get_payload(uint8_t*& payload, int& len) { len = payload_len_; payload = payload_; + return srs_success; } srs_error_t SrsRtcpApp::set_subtype(uint8_t type) { if(31 < type) { - return srs_error_new(ERROR_RTC_RTCP, "subtype is out of range. type:%d", type); + return srs_error_new(ERROR_RTC_RTCP, "invalid type: %d", type); } + header_.rc = type; + return srs_success; } srs_error_t SrsRtcpApp::set_name(std::string name) { if(name.length() > 4) { - return srs_error_new(ERROR_RTC_RTCP, "length of name is more than 4. len:%d", name.length()); + return srs_error_new(ERROR_RTC_RTCP, "invalid name length %d", name.length()); } + memset(name_, 0, sizeof(name_)); memcpy(name_, name.c_str(), name.length()); + return srs_success; } srs_error_t SrsRtcpApp::set_payload(uint8_t* payload, int len) { if(len > (kRtcpPacketSize - 12)) { - return srs_error_new(ERROR_RTC_RTCP, "length of payload is more than 1488. len:%d", len); + return srs_error_new(ERROR_RTC_RTCP, "invalid payload length %d", len); } + payload_len_ = len; memcpy(payload_, payload, len); + return srs_success; } @@ -139,14 +150,18 @@ void SrsRtcpApp::set_ssrc(uint32_t ssrc) srs_error_t SrsRtcpApp::decode(SrsBuffer *buffer) { srs_error_t err = srs_success; - err = decode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to parse rtcp header"); + + if(srs_success != (err = decode_header(buffer))) { + return srs_error_wrap(err, "decode header"); } + ssrc_ = buffer->read_4bytes(); buffer->read_bytes((char *)name_, sizeof(name_)); + + // TODO: FIXME: Should check size? payload_len_ = (header_.length + 1) * 4 - sizeof(SrsRtcpHeader) - sizeof(name_) - sizeof(ssrc_); buffer->read_bytes((char *)payload_, payload_len_); + return srs_success; } @@ -158,14 +173,15 @@ int SrsRtcpApp::nb_bytes() srs_error_t SrsRtcpApp::encode(SrsBuffer *buffer) { srs_error_t err = srs_success; - if(! buffer->require(nb_bytes())) { - return srs_error_new(ERROR_RTC_RTCP, - "the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes()); + + if(!buffer->require(nb_bytes())) { + return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes()); } - err = encode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to encode rtcp header"); + + if(srs_success != (err = encode_header(buffer))) { + return srs_error_wrap(err, "encode header"); } + buffer->write_4bytes(ssrc_); buffer->write_bytes((char*)name_, sizeof(name_)); buffer->write_bytes((char*)payload_, payload_len_); @@ -240,19 +256,22 @@ void SrsRtcpSR::set_rtp_send_bytes(uint32_t bytes) srs_error_t SrsRtcpSR::decode(SrsBuffer *buffer) { srs_error_t err = srs_success; - err = decode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to parse rtcp header"); + + if(srs_success != (err = decode_header(buffer))) { + return srs_error_wrap(err, "decode header"); } + sender_ssrc_ = buffer->read_4bytes(); ntp_ = buffer->read_8bytes(); rtp_ts_ = buffer->read_4bytes(); send_rtp_packets_ = buffer->read_4bytes(); send_rtp_bytes_ = buffer->read_4bytes(); + if(header_.rc > 0) { char buf[1500]; buffer->read_bytes(buf, header_.rc * 24); } + return err; } @@ -264,19 +283,21 @@ int SrsRtcpSR::nb_bytes() srs_error_t SrsRtcpSR::encode(SrsBuffer *buffer) { srs_error_t err = srs_success; - if(! buffer->require(nb_bytes())) { - return srs_error_new(ERROR_RTC_RTCP, - "the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes()); + + if(!buffer->require(nb_bytes())) { + return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes()); } - err = encode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to encode rtcp header"); + + if(srs_success != (err = encode_header(buffer))) { + return srs_error_wrap(err, "encode header"); } + buffer->write_4bytes(sender_ssrc_); buffer->write_8bytes(ntp_); buffer->write_4bytes(rtp_ts_); buffer->write_4bytes(send_rtp_packets_); buffer->write_4bytes(send_rtp_bytes_); + return err; } @@ -372,13 +393,14 @@ void SrsRtcpRR::set_sender_ntp(uint64_t ntp) srs_error_t SrsRtcpRR::decode(SrsBuffer *buffer) { srs_error_t err = srs_success; - err = decode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to parse rtcp header"); + + if(srs_success != (err = decode_header(buffer))) { + return srs_error_wrap(err, "decode header"); } + sender_ssrc_ = buffer->read_4bytes(); if(header_.rc < 1) { - return srs_success; + return err; } rb_.ssrc = buffer->read_4bytes(); rb_.fraction_lost = buffer->read_1bytes(); @@ -404,15 +426,14 @@ int SrsRtcpRR::nb_bytes() srs_error_t SrsRtcpRR::encode(SrsBuffer *buffer) { srs_error_t err = srs_success; - if(! buffer->require(nb_bytes())) { - return srs_error_new(ERROR_RTC_RTCP, - "the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes()); + + if(!buffer->require(nb_bytes())) { + return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes()); } header_.rc = 1; - err = encode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to encode rtcp header"); + if(srs_success != (err = encode_header(buffer))) { + return srs_error_wrap(err, "encode header"); } buffer->write_4bytes(sender_ssrc_); @@ -423,6 +444,7 @@ srs_error_t SrsRtcpRR::encode(SrsBuffer *buffer) buffer->write_4bytes(rb_.jitter); buffer->write_4bytes(rb_.lsr); buffer->write_4bytes(rb_.dlsr); + return err; } @@ -518,10 +540,12 @@ srs_error_t SrsRtcpTWCC::recv_packet(uint16_t sn, srs_utime_t ts) { map::iterator it = recv_packes_.find(sn); if(it != recv_packes_.end()) { - return srs_error_new(ERROR_RTC_RTCP, "twcc: recv duplicated sn:%d", sn); + return srs_error_new(ERROR_RTC_RTCP, "TWCC dup seq: %d", sn); } + recv_packes_[sn] = ts; recv_sns_.insert(sn); + return srs_success; } @@ -553,30 +577,22 @@ srs_utime_t SrsRtcpTWCC::calculate_delta_us(srs_utime_t ts, srs_utime_t last) bool SrsRtcpTWCC::can_add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, int delta_size) { - srs_verbose("can_add %d chunk->size %u delta_sizes %d %d %d %d %d %d %d %d %d %d %d %d %d %d" - " all_same %d has_large_delta %d", - delta_size, - chunk.size, - chunk.delta_sizes[0], chunk.delta_sizes[1], chunk.delta_sizes[2], - chunk.delta_sizes[3], chunk.delta_sizes[4], chunk.delta_sizes[5], - chunk.delta_sizes[6], chunk.delta_sizes[7], chunk.delta_sizes[8], - chunk.delta_sizes[9], chunk.delta_sizes[10], chunk.delta_sizes[11], - chunk.delta_sizes[12], chunk.delta_sizes[13], - (int)chunk.all_same, - (int)chunk.has_large_delta - ); + srs_verbose("can_add %d chunk->size %u delta_sizes %d %d %d %d %d %d %d %d %d %d %d %d %d %d all_same %d has_large_delta %d", + delta_size, chunk.size, chunk.delta_sizes[0], chunk.delta_sizes[1], chunk.delta_sizes[2], chunk.delta_sizes[3], + chunk.delta_sizes[4], chunk.delta_sizes[5], chunk.delta_sizes[6], chunk.delta_sizes[7], chunk.delta_sizes[8], + chunk.delta_sizes[9], chunk.delta_sizes[10], chunk.delta_sizes[11], chunk.delta_sizes[12], chunk.delta_sizes[13], + (int)chunk.all_same, (int)chunk.has_large_delta); - if (chunk.size < kTwccFbTwoBitElements) + if (chunk.size < kTwccFbTwoBitElements) { return true; + } - - if (chunk.size < kTwccFbOneBitElements && !chunk.has_large_delta && delta_size != kTwccFbLargeRecvDeltaBytes) + if (chunk.size < kTwccFbOneBitElements && !chunk.has_large_delta && delta_size != kTwccFbLargeRecvDeltaBytes) { return true; - + } if (chunk.size < kTwccFbMaxRunLength && chunk.all_same && chunk.delta_sizes[0] == delta_size) { - srs_verbose("< 8191 && all_same && delta_size[0] %d == %d", - chunk.delta_sizes[0], delta_size); + srs_verbose("< %d && all_same && delta_size[0] %d == %d", kTwccFbMaxRunLength, chunk.delta_sizes[0], delta_size); return true; } @@ -585,8 +601,10 @@ bool SrsRtcpTWCC::can_add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, in void SrsRtcpTWCC::add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, int delta_size) { - if (chunk.size < kTwccFbMaxBitElements) + if (chunk.size < kTwccFbMaxBitElements) { chunk.delta_sizes[chunk.size] = delta_size; + } + chunk.size += 1; chunk.all_same = chunk.all_same && delta_size == chunk.delta_sizes[0]; chunk.has_large_delta = chunk.has_large_delta || delta_size >= kTwccFbLargeRecvDeltaBytes; @@ -594,22 +612,25 @@ void SrsRtcpTWCC::add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, int de srs_error_t SrsRtcpTWCC::encode_chunk_run_length(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk) { - if (!chunk.all_same || chunk.size > kTwccFbMaxRunLength) - return srs_error_new(ERROR_RTC_RTCP, "cannot encode by run length. all_same:%d, size:%d", chunk.all_same, chunk.size); + if (!chunk.all_same || chunk.size > kTwccFbMaxRunLength) { + return srs_error_new(ERROR_RTC_RTCP, "invalid run all_same:%d, size:%d", chunk.all_same, chunk.size); + } uint16_t encoded_chunk = (chunk.delta_sizes[0] << 13) | chunk.size; encoded_chucks_.push_back(encoded_chunk); pkt_len += sizeof(encoded_chunk); - return 0; + return srs_success; } srs_error_t SrsRtcpTWCC::encode_chunk_one_bit(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk) { int i = 0; - if (chunk.has_large_delta) - return srs_error_new(ERROR_RTC_RTCP, "it's large delta, cannot encode by one bit moe"); + if (chunk.has_large_delta) { + return srs_error_new(ERROR_RTC_RTCP, "invalid large delta"); + } + uint16_t encoded_chunk = 0x8000; for (i = 0; i < chunk.size; ++i) { encoded_chunk |= (chunk.delta_sizes[i] << (kTwccFbOneBitElements - 1 - i)); @@ -662,26 +683,29 @@ srs_error_t SrsRtcpTWCC::encode_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk) { srs_error_t err = srs_success; - if (can_add_to_chunk(chunk, 0) && can_add_to_chunk(chunk, 1) && - can_add_to_chunk(chunk, 2)) - return srs_error_new(ERROR_RTC_RTCP, "it should be added to chunk, not encode"); + if (can_add_to_chunk(chunk, 0) && can_add_to_chunk(chunk, 1) && can_add_to_chunk(chunk, 2)) { + return srs_error_new(ERROR_RTC_RTCP, "TWCC chunk"); + } if (chunk.all_same) { - if ((err = encode_chunk_run_length(chunk)) != srs_success) - return srs_error_wrap(err, "fail to encode chunk by run length mode"); + if ((err = encode_chunk_run_length(chunk)) != srs_success) { + return srs_error_wrap(err, "encode run"); + } reset_chunk(chunk); return err; } if (chunk.size == kTwccFbOneBitElements) { - if ((err = encode_chunk_one_bit(chunk)) != srs_success) - return srs_error_wrap(err, "fail to encode chunk by one bit mode"); + if ((err = encode_chunk_one_bit(chunk)) != srs_success) { + return srs_error_wrap(err, "encode chunk"); + } reset_chunk(chunk); return err; } - if ((err =encode_chunk_two_bit(chunk, kTwccFbTwoBitElements, true)) != srs_success) - return srs_error_wrap(err, "fail to encode chunk by two bit mode"); + if ((err = encode_chunk_two_bit(chunk, kTwccFbTwoBitElements, true)) != srs_success) { + return srs_error_wrap(err, "encode chunk"); + } return err; } @@ -714,7 +738,7 @@ srs_error_t SrsRtcpTWCC::process_pkt_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& c return err; } if ((err = encode_chunk(chunk)) != srs_success) { - return srs_error_new(ERROR_RTC_RTCP, "chunk can not be encoded, delta_size %u", delta_size); + return srs_error_new(ERROR_RTC_RTCP, "encode chunk, delta_size %u", delta_size); } add_to_chunk(chunk, delta_size); return err; @@ -723,10 +747,11 @@ srs_error_t SrsRtcpTWCC::process_pkt_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& c srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer) { srs_error_t err = srs_success; - if(! buffer->require(nb_bytes())) { - return srs_error_new(ERROR_RTC_RTCP, - "the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes()); + + if(!buffer->require(nb_bytes())) { + return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes()); } + pkt_len = kTwccFbPktHeaderSize; set::iterator it_sn = recv_sns_.begin(); base_sn_ = *it_sn; @@ -736,6 +761,7 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer) srs_utime_t last_ts = (srs_utime_t)(reference_time_) * kTwccFbTimeMultiplier; uint16_t last_sn = base_sn_; packet_count_ = recv_packes_.size(); + do { // encode chunk SrsRtcpTWCC::srs_rtcp_twcc_chunk_t chunk; @@ -755,13 +781,12 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer) process_pkt_chunk(chunk, 0); packet_count_++; } - } // FIXME 24-bit base receive delta not supported int recv_delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2; if ((err = process_pkt_chunk(chunk, recv_delta_size)) != srs_success) { - return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta\n", recv_delta_size); + return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta", recv_delta_size); } pkt_deltas_.push_back(delta); @@ -772,7 +797,7 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer) if(0 < chunk.size) { if((err = encode_remaining_chunk(chunk)) != srs_success) { - return srs_error_wrap(err, "fail to encode remaining chunk"); + return srs_error_wrap(err, "encode chunk"); } } @@ -784,9 +809,8 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer) } header_.length -= 1; - err = encode_header(buffer); - if(srs_success != err) { - err = srs_error_wrap(err, "fail to encode rtcp header"); + if(srs_success != (err = encode_header(buffer))) { + err = srs_error_wrap(err, "encode header"); break; } buffer->write_4bytes(sender_ssrc_); @@ -860,10 +884,11 @@ void SrsRtcpNack::add_lost_sn(uint16_t sn) srs_error_t SrsRtcpNack::decode(SrsBuffer *buffer) { srs_error_t err = srs_success; - err = decode_header(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to parse rtcp header"); + + if(srs_success != (err = decode_header(buffer))) { + return srs_error_wrap(err, "decode header"); } + sender_ssrc_ = buffer->read_4bytes(); media_ssrc_ = buffer->read_4bytes(); char bitmask[20]; @@ -891,9 +916,8 @@ int SrsRtcpNack::nb_bytes() srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer) { srs_error_t err = srs_success; - if(! buffer->require(nb_bytes())) { - return srs_error_new(ERROR_RTC_RTCP, - "the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes()); + if(!buffer->require(nb_bytes())) { + return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes()); } vector chunks; @@ -911,7 +935,7 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer) continue; } if((sn - pid) < 1) { - srs_info("Skipping PID to NACK (%d already added)...\n", sn); + srs_info("skip seq %d", sn); } else if( (sn - pid) > 16) { // add new chunk chunks.push_back(chunk); @@ -925,9 +949,8 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer) } header_.length = 2 + chunks.size(); - err = encode_header(buffer); - if(srs_success != err) { - err = srs_error_wrap(err, "fail to encode rtcp header"); + if(srs_success != (err = encode_header(buffer))) { + err = srs_error_wrap(err, "encode header"); break; } buffer->write_4bytes(sender_ssrc_); @@ -936,7 +959,6 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer) buffer->write_2bytes(it_chunk->pid); buffer->write_2bytes(it_chunk->blp); } - } while(0); return err; @@ -966,7 +988,7 @@ srs_error_t SrsRtcpCompound::add_rtcp(SrsRtcpCommon *rtcp) { int new_len = rtcp->nb_bytes(); if((new_len + nb_bytes_) > kRtcpPacketSize) { - return srs_error_new(ERROR_RTC_RTCP, "exceed the rtcp max size. new rtcp: %d, current: %d", new_len, nb_bytes_); + return srs_error_new(ERROR_RTC_RTCP, "overflow, new rtcp: %d, current: %d", new_len, nb_bytes_); } nb_bytes_ += new_len; rtcps_.push_back(rtcp); @@ -978,43 +1000,29 @@ srs_error_t SrsRtcpCompound::decode(SrsBuffer *buffer) { srs_error_t err = srs_success; - while(0 != buffer->left()) { - SrsRtcpHeader* header = (SrsRtcpHeader *)(buffer->head()); - switch (header->type) - { - case SrsRtcpType_sr: - { + while (buffer->empty()) { + SrsRtcpHeader* header = (SrsRtcpHeader*)(buffer->head()); + if (header->type == SrsRtcpType_sr) { SrsRtcpSR *rtcp = new SrsRtcpSR; - err = rtcp->decode(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to decode rtcp sr"); + if(srs_success != (err = rtcp->decode(buffer))) { + return srs_error_wrap(err, "decode sr"); } nb_bytes_ += rtcp->nb_bytes(); rtcps_.push_back(rtcp); - break; - } - case SrsRtcpType_rr: - { + } else if (header->type == SrsRtcpType_rr) { SrsRtcpRR *rtcp = new SrsRtcpRR; - err = rtcp->decode(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to decode rtcp rr"); + if(srs_success != (err = rtcp->decode(buffer))) { + return srs_error_wrap(err, "decode rr"); } nb_bytes_ += rtcp->nb_bytes(); rtcps_.push_back(rtcp); - break; - } - default: - { + } else { SrsRtcpCommon *rtcp = new SrsRtcpCommon; - err = rtcp->decode(buffer); - if(srs_success != err) { - return srs_error_wrap(err, "fail to decode rtcp type:%d", header->type); + if(srs_success != (err = rtcp->decode(buffer))) { + return srs_error_wrap(err, "decode type: %#x", header->type); } nb_bytes_ += rtcp->nb_bytes(); rtcps_.push_back(rtcp); - break; - } } } @@ -1029,17 +1037,15 @@ int SrsRtcpCompound::nb_bytes() srs_error_t SrsRtcpCompound::encode(SrsBuffer *buffer) { srs_error_t err = srs_success; - if(false == buffer->require(nb_bytes_)) { - return srs_error_new(ERROR_RTC_RTCP, - "the left size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes_); + if(!buffer->require(nb_bytes_)) { + return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes_); } vector::iterator it; for(it = rtcps_.begin(); it != rtcps_.end(); ++it) { SrsRtcpCommon *rtcp = *it; - err = rtcp->encode(buffer); - if(err != srs_success) { - return srs_error_wrap(err, "fail to encode rtcp compound. type:%d", rtcp->type()); + if((err = rtcp->encode(buffer)) != srs_success) { + return srs_error_wrap(err, "encode compound type:%d", rtcp->type()); } }