1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 20:01:56 +00:00

RTC: Refactor code

This commit is contained in:
忘篱 2020-05-15 18:06:25 +08:00
parent d1485c40e4
commit be1b0bf941

View file

@ -40,6 +40,7 @@ srs_error_t SrsRtcpCommon::decode_header(SrsBuffer *buffer)
{
buffer->read_bytes((char*)(&header_), sizeof(SrsRtcpHeader));
header_.length = ntohs(header_.length);
return srs_success;
}
@ -47,19 +48,22 @@ srs_error_t SrsRtcpCommon::encode_header(SrsBuffer *buffer)
{
header_.length = htons(header_.length);
buffer->write_bytes((char*)(&header_), sizeof(SrsRtcpHeader));
return srs_success;
}
srs_error_t SrsRtcpCommon::decode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
err = decode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to parse rtcp header");
if(srs_success != (err = decode_header(buffer))) {
return srs_error_wrap(err, "decode header");
}
payload_len_ = (header_.length + 1) * 4 - sizeof(SrsRtcpHeader);
buffer->read_bytes((char *)payload_, payload_len_);
return srs_success;
return err;
}
int SrsRtcpCommon::nb_bytes()
@ -99,35 +103,42 @@ const srs_error_t SrsRtcpApp::get_payload(uint8_t*& payload, int& len)
{
len = payload_len_;
payload = payload_;
return srs_success;
}
srs_error_t SrsRtcpApp::set_subtype(uint8_t type)
{
if(31 < type) {
return srs_error_new(ERROR_RTC_RTCP, "subtype is out of range. type:%d", type);
return srs_error_new(ERROR_RTC_RTCP, "invalid type: %d", type);
}
header_.rc = type;
return srs_success;
}
srs_error_t SrsRtcpApp::set_name(std::string name)
{
if(name.length() > 4) {
return srs_error_new(ERROR_RTC_RTCP, "length of name is more than 4. len:%d", name.length());
return srs_error_new(ERROR_RTC_RTCP, "invalid name length %d", name.length());
}
memset(name_, 0, sizeof(name_));
memcpy(name_, name.c_str(), name.length());
return srs_success;
}
srs_error_t SrsRtcpApp::set_payload(uint8_t* payload, int len)
{
if(len > (kRtcpPacketSize - 12)) {
return srs_error_new(ERROR_RTC_RTCP, "length of payload is more than 1488. len:%d", len);
return srs_error_new(ERROR_RTC_RTCP, "invalid payload length %d", len);
}
payload_len_ = len;
memcpy(payload_, payload, len);
return srs_success;
}
@ -139,14 +150,18 @@ void SrsRtcpApp::set_ssrc(uint32_t ssrc)
srs_error_t SrsRtcpApp::decode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
err = decode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to parse rtcp header");
if(srs_success != (err = decode_header(buffer))) {
return srs_error_wrap(err, "decode header");
}
ssrc_ = buffer->read_4bytes();
buffer->read_bytes((char *)name_, sizeof(name_));
// TODO: FIXME: Should check size?
payload_len_ = (header_.length + 1) * 4 - sizeof(SrsRtcpHeader) - sizeof(name_) - sizeof(ssrc_);
buffer->read_bytes((char *)payload_, payload_len_);
return srs_success;
}
@ -158,14 +173,15 @@ int SrsRtcpApp::nb_bytes()
srs_error_t SrsRtcpApp::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(!buffer->require(nb_bytes())) {
return srs_error_new(ERROR_RTC_RTCP,
"the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes());
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes());
}
err = encode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to encode rtcp header");
if(srs_success != (err = encode_header(buffer))) {
return srs_error_wrap(err, "encode header");
}
buffer->write_4bytes(ssrc_);
buffer->write_bytes((char*)name_, sizeof(name_));
buffer->write_bytes((char*)payload_, payload_len_);
@ -240,19 +256,22 @@ void SrsRtcpSR::set_rtp_send_bytes(uint32_t bytes)
srs_error_t SrsRtcpSR::decode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
err = decode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to parse rtcp header");
if(srs_success != (err = decode_header(buffer))) {
return srs_error_wrap(err, "decode header");
}
sender_ssrc_ = buffer->read_4bytes();
ntp_ = buffer->read_8bytes();
rtp_ts_ = buffer->read_4bytes();
send_rtp_packets_ = buffer->read_4bytes();
send_rtp_bytes_ = buffer->read_4bytes();
if(header_.rc > 0) {
char buf[1500];
buffer->read_bytes(buf, header_.rc * 24);
}
return err;
}
@ -264,19 +283,21 @@ int SrsRtcpSR::nb_bytes()
srs_error_t SrsRtcpSR::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(!buffer->require(nb_bytes())) {
return srs_error_new(ERROR_RTC_RTCP,
"the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes());
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes());
}
err = encode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to encode rtcp header");
if(srs_success != (err = encode_header(buffer))) {
return srs_error_wrap(err, "encode header");
}
buffer->write_4bytes(sender_ssrc_);
buffer->write_8bytes(ntp_);
buffer->write_4bytes(rtp_ts_);
buffer->write_4bytes(send_rtp_packets_);
buffer->write_4bytes(send_rtp_bytes_);
return err;
}
@ -372,13 +393,14 @@ void SrsRtcpRR::set_sender_ntp(uint64_t ntp)
srs_error_t SrsRtcpRR::decode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
err = decode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to parse rtcp header");
if(srs_success != (err = decode_header(buffer))) {
return srs_error_wrap(err, "decode header");
}
sender_ssrc_ = buffer->read_4bytes();
if(header_.rc < 1) {
return srs_success;
return err;
}
rb_.ssrc = buffer->read_4bytes();
rb_.fraction_lost = buffer->read_1bytes();
@ -404,15 +426,14 @@ int SrsRtcpRR::nb_bytes()
srs_error_t SrsRtcpRR::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(!buffer->require(nb_bytes())) {
return srs_error_new(ERROR_RTC_RTCP,
"the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes());
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes());
}
header_.rc = 1;
err = encode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to encode rtcp header");
if(srs_success != (err = encode_header(buffer))) {
return srs_error_wrap(err, "encode header");
}
buffer->write_4bytes(sender_ssrc_);
@ -423,6 +444,7 @@ srs_error_t SrsRtcpRR::encode(SrsBuffer *buffer)
buffer->write_4bytes(rb_.jitter);
buffer->write_4bytes(rb_.lsr);
buffer->write_4bytes(rb_.dlsr);
return err;
}
@ -518,10 +540,12 @@ srs_error_t SrsRtcpTWCC::recv_packet(uint16_t sn, srs_utime_t ts)
{
map<uint16_t, srs_utime_t>::iterator it = recv_packes_.find(sn);
if(it != recv_packes_.end()) {
return srs_error_new(ERROR_RTC_RTCP, "twcc: recv duplicated sn:%d", sn);
return srs_error_new(ERROR_RTC_RTCP, "TWCC dup seq: %d", sn);
}
recv_packes_[sn] = ts;
recv_sns_.insert(sn);
return srs_success;
}
@ -553,30 +577,22 @@ srs_utime_t SrsRtcpTWCC::calculate_delta_us(srs_utime_t ts, srs_utime_t last)
bool SrsRtcpTWCC::can_add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, int delta_size)
{
srs_verbose("can_add %d chunk->size %u delta_sizes %d %d %d %d %d %d %d %d %d %d %d %d %d %d"
" all_same %d has_large_delta %d",
delta_size,
chunk.size,
chunk.delta_sizes[0], chunk.delta_sizes[1], chunk.delta_sizes[2],
chunk.delta_sizes[3], chunk.delta_sizes[4], chunk.delta_sizes[5],
chunk.delta_sizes[6], chunk.delta_sizes[7], chunk.delta_sizes[8],
chunk.delta_sizes[9], chunk.delta_sizes[10], chunk.delta_sizes[11],
chunk.delta_sizes[12], chunk.delta_sizes[13],
(int)chunk.all_same,
(int)chunk.has_large_delta
);
srs_verbose("can_add %d chunk->size %u delta_sizes %d %d %d %d %d %d %d %d %d %d %d %d %d %d all_same %d has_large_delta %d",
delta_size, chunk.size, chunk.delta_sizes[0], chunk.delta_sizes[1], chunk.delta_sizes[2], chunk.delta_sizes[3],
chunk.delta_sizes[4], chunk.delta_sizes[5], chunk.delta_sizes[6], chunk.delta_sizes[7], chunk.delta_sizes[8],
chunk.delta_sizes[9], chunk.delta_sizes[10], chunk.delta_sizes[11], chunk.delta_sizes[12], chunk.delta_sizes[13],
(int)chunk.all_same, (int)chunk.has_large_delta);
if (chunk.size < kTwccFbTwoBitElements)
if (chunk.size < kTwccFbTwoBitElements) {
return true;
}
if (chunk.size < kTwccFbOneBitElements && !chunk.has_large_delta && delta_size != kTwccFbLargeRecvDeltaBytes)
if (chunk.size < kTwccFbOneBitElements && !chunk.has_large_delta && delta_size != kTwccFbLargeRecvDeltaBytes) {
return true;
}
if (chunk.size < kTwccFbMaxRunLength && chunk.all_same && chunk.delta_sizes[0] == delta_size) {
srs_verbose("< 8191 && all_same && delta_size[0] %d == %d",
chunk.delta_sizes[0], delta_size);
srs_verbose("< %d && all_same && delta_size[0] %d == %d", kTwccFbMaxRunLength, chunk.delta_sizes[0], delta_size);
return true;
}
@ -585,8 +601,10 @@ bool SrsRtcpTWCC::can_add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, in
void SrsRtcpTWCC::add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, int delta_size)
{
if (chunk.size < kTwccFbMaxBitElements)
if (chunk.size < kTwccFbMaxBitElements) {
chunk.delta_sizes[chunk.size] = delta_size;
}
chunk.size += 1;
chunk.all_same = chunk.all_same && delta_size == chunk.delta_sizes[0];
chunk.has_large_delta = chunk.has_large_delta || delta_size >= kTwccFbLargeRecvDeltaBytes;
@ -594,22 +612,25 @@ void SrsRtcpTWCC::add_to_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk, int de
srs_error_t SrsRtcpTWCC::encode_chunk_run_length(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk)
{
if (!chunk.all_same || chunk.size > kTwccFbMaxRunLength)
return srs_error_new(ERROR_RTC_RTCP, "cannot encode by run length. all_same:%d, size:%d", chunk.all_same, chunk.size);
if (!chunk.all_same || chunk.size > kTwccFbMaxRunLength) {
return srs_error_new(ERROR_RTC_RTCP, "invalid run all_same:%d, size:%d", chunk.all_same, chunk.size);
}
uint16_t encoded_chunk = (chunk.delta_sizes[0] << 13) | chunk.size;
encoded_chucks_.push_back(encoded_chunk);
pkt_len += sizeof(encoded_chunk);
return 0;
return srs_success;
}
srs_error_t SrsRtcpTWCC::encode_chunk_one_bit(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk)
{
int i = 0;
if (chunk.has_large_delta)
return srs_error_new(ERROR_RTC_RTCP, "it's large delta, cannot encode by one bit moe");
if (chunk.has_large_delta) {
return srs_error_new(ERROR_RTC_RTCP, "invalid large delta");
}
uint16_t encoded_chunk = 0x8000;
for (i = 0; i < chunk.size; ++i) {
encoded_chunk |= (chunk.delta_sizes[i] << (kTwccFbOneBitElements - 1 - i));
@ -662,26 +683,29 @@ srs_error_t SrsRtcpTWCC::encode_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& chunk)
{
srs_error_t err = srs_success;
if (can_add_to_chunk(chunk, 0) && can_add_to_chunk(chunk, 1) &&
can_add_to_chunk(chunk, 2))
return srs_error_new(ERROR_RTC_RTCP, "it should be added to chunk, not encode");
if (can_add_to_chunk(chunk, 0) && can_add_to_chunk(chunk, 1) && can_add_to_chunk(chunk, 2)) {
return srs_error_new(ERROR_RTC_RTCP, "TWCC chunk");
}
if (chunk.all_same) {
if ((err = encode_chunk_run_length(chunk)) != srs_success)
return srs_error_wrap(err, "fail to encode chunk by run length mode");
if ((err = encode_chunk_run_length(chunk)) != srs_success) {
return srs_error_wrap(err, "encode run");
}
reset_chunk(chunk);
return err;
}
if (chunk.size == kTwccFbOneBitElements) {
if ((err = encode_chunk_one_bit(chunk)) != srs_success)
return srs_error_wrap(err, "fail to encode chunk by one bit mode");
if ((err = encode_chunk_one_bit(chunk)) != srs_success) {
return srs_error_wrap(err, "encode chunk");
}
reset_chunk(chunk);
return err;
}
if ((err =encode_chunk_two_bit(chunk, kTwccFbTwoBitElements, true)) != srs_success)
return srs_error_wrap(err, "fail to encode chunk by two bit mode");
if ((err = encode_chunk_two_bit(chunk, kTwccFbTwoBitElements, true)) != srs_success) {
return srs_error_wrap(err, "encode chunk");
}
return err;
}
@ -714,7 +738,7 @@ srs_error_t SrsRtcpTWCC::process_pkt_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& c
return err;
}
if ((err = encode_chunk(chunk)) != srs_success) {
return srs_error_new(ERROR_RTC_RTCP, "chunk can not be encoded, delta_size %u", delta_size);
return srs_error_new(ERROR_RTC_RTCP, "encode chunk, delta_size %u", delta_size);
}
add_to_chunk(chunk, delta_size);
return err;
@ -723,10 +747,11 @@ srs_error_t SrsRtcpTWCC::process_pkt_chunk(SrsRtcpTWCC::srs_rtcp_twcc_chunk_t& c
srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(!buffer->require(nb_bytes())) {
return srs_error_new(ERROR_RTC_RTCP,
"the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes());
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes());
}
pkt_len = kTwccFbPktHeaderSize;
set<uint16_t, SrsSeqCompareLess>::iterator it_sn = recv_sns_.begin();
base_sn_ = *it_sn;
@ -736,6 +761,7 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
srs_utime_t last_ts = (srs_utime_t)(reference_time_) * kTwccFbTimeMultiplier;
uint16_t last_sn = base_sn_;
packet_count_ = recv_packes_.size();
do {
// encode chunk
SrsRtcpTWCC::srs_rtcp_twcc_chunk_t chunk;
@ -755,13 +781,12 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
process_pkt_chunk(chunk, 0);
packet_count_++;
}
}
// FIXME 24-bit base receive delta not supported
int recv_delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2;
if ((err = process_pkt_chunk(chunk, recv_delta_size)) != srs_success) {
return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta\n", recv_delta_size);
return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta", recv_delta_size);
}
pkt_deltas_.push_back(delta);
@ -772,7 +797,7 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
if(0 < chunk.size) {
if((err = encode_remaining_chunk(chunk)) != srs_success) {
return srs_error_wrap(err, "fail to encode remaining chunk");
return srs_error_wrap(err, "encode chunk");
}
}
@ -784,9 +809,8 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
}
header_.length -= 1;
err = encode_header(buffer);
if(srs_success != err) {
err = srs_error_wrap(err, "fail to encode rtcp header");
if(srs_success != (err = encode_header(buffer))) {
err = srs_error_wrap(err, "encode header");
break;
}
buffer->write_4bytes(sender_ssrc_);
@ -860,10 +884,11 @@ void SrsRtcpNack::add_lost_sn(uint16_t sn)
srs_error_t SrsRtcpNack::decode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
err = decode_header(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to parse rtcp header");
if(srs_success != (err = decode_header(buffer))) {
return srs_error_wrap(err, "decode header");
}
sender_ssrc_ = buffer->read_4bytes();
media_ssrc_ = buffer->read_4bytes();
char bitmask[20];
@ -892,8 +917,7 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(!buffer->require(nb_bytes())) {
return srs_error_new(ERROR_RTC_RTCP,
"the size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes());
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes());
}
vector<pid_blp_t> chunks;
@ -911,7 +935,7 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer)
continue;
}
if((sn - pid) < 1) {
srs_info("Skipping PID to NACK (%d already added)...\n", sn);
srs_info("skip seq %d", sn);
} else if( (sn - pid) > 16) {
// add new chunk
chunks.push_back(chunk);
@ -925,9 +949,8 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer)
}
header_.length = 2 + chunks.size();
err = encode_header(buffer);
if(srs_success != err) {
err = srs_error_wrap(err, "fail to encode rtcp header");
if(srs_success != (err = encode_header(buffer))) {
err = srs_error_wrap(err, "encode header");
break;
}
buffer->write_4bytes(sender_ssrc_);
@ -936,7 +959,6 @@ srs_error_t SrsRtcpNack::encode(SrsBuffer *buffer)
buffer->write_2bytes(it_chunk->pid);
buffer->write_2bytes(it_chunk->blp);
}
} while(0);
return err;
@ -966,7 +988,7 @@ srs_error_t SrsRtcpCompound::add_rtcp(SrsRtcpCommon *rtcp)
{
int new_len = rtcp->nb_bytes();
if((new_len + nb_bytes_) > kRtcpPacketSize) {
return srs_error_new(ERROR_RTC_RTCP, "exceed the rtcp max size. new rtcp: %d, current: %d", new_len, nb_bytes_);
return srs_error_new(ERROR_RTC_RTCP, "overflow, new rtcp: %d, current: %d", new_len, nb_bytes_);
}
nb_bytes_ += new_len;
rtcps_.push_back(rtcp);
@ -978,43 +1000,29 @@ srs_error_t SrsRtcpCompound::decode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
while(0 != buffer->left()) {
while (buffer->empty()) {
SrsRtcpHeader* header = (SrsRtcpHeader*)(buffer->head());
switch (header->type)
{
case SrsRtcpType_sr:
{
if (header->type == SrsRtcpType_sr) {
SrsRtcpSR *rtcp = new SrsRtcpSR;
err = rtcp->decode(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to decode rtcp sr");
if(srs_success != (err = rtcp->decode(buffer))) {
return srs_error_wrap(err, "decode sr");
}
nb_bytes_ += rtcp->nb_bytes();
rtcps_.push_back(rtcp);
break;
}
case SrsRtcpType_rr:
{
} else if (header->type == SrsRtcpType_rr) {
SrsRtcpRR *rtcp = new SrsRtcpRR;
err = rtcp->decode(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to decode rtcp rr");
if(srs_success != (err = rtcp->decode(buffer))) {
return srs_error_wrap(err, "decode rr");
}
nb_bytes_ += rtcp->nb_bytes();
rtcps_.push_back(rtcp);
break;
}
default:
{
} else {
SrsRtcpCommon *rtcp = new SrsRtcpCommon;
err = rtcp->decode(buffer);
if(srs_success != err) {
return srs_error_wrap(err, "fail to decode rtcp type:%d", header->type);
if(srs_success != (err = rtcp->decode(buffer))) {
return srs_error_wrap(err, "decode type: %#x", header->type);
}
nb_bytes_ += rtcp->nb_bytes();
rtcps_.push_back(rtcp);
break;
}
}
}
@ -1029,17 +1037,15 @@ int SrsRtcpCompound::nb_bytes()
srs_error_t SrsRtcpCompound::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(false == buffer->require(nb_bytes_)) {
return srs_error_new(ERROR_RTC_RTCP,
"the left size of buffer is not enough. buffer:%d, required:%d", buffer->left(), nb_bytes_);
if(!buffer->require(nb_bytes_)) {
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes_);
}
vector<SrsRtcpCommon*>::iterator it;
for(it = rtcps_.begin(); it != rtcps_.end(); ++it) {
SrsRtcpCommon *rtcp = *it;
err = rtcp->encode(buffer);
if(err != srs_success) {
return srs_error_wrap(err, "fail to encode rtcp compound. type:%d", rtcp->type());
if((err = rtcp->encode(buffer)) != srs_success) {
return srs_error_wrap(err, "encode compound type:%d", rtcp->type());
}
}