1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

RTC: Refine RTP packet api, keep shared message to private

This commit is contained in:
winlin 2021-02-26 12:25:35 +08:00
parent 2ae7e22469
commit 58d71c23bc
7 changed files with 50 additions and 243 deletions

View file

@ -1273,226 +1273,6 @@ SrsRtpFrameBufferEnum SrsRtpJitterBuffer::InsertPacket(uint16_t seq, uint32_t ts
return buffer_state;
}
SrsRtpFrameBufferEnum SrsRtpJitterBuffer::InsertPacket2(const SrsRtpPacket2 &pkt,
bool* retransmitted)
{
bool singlenual = false;
H264PacketizationTypes packetyType = kH264SingleNalu;
//char *buf = pkt.shared_msg->payload; //rtp packet data
//int size = pkt.shared_msg->size; //rtp size
//rtp header: total size - rtp payload size
int rtp_header_size = pkt.shared_msg->size - pkt.payload->nb_bytes();
char *rtp_payload_buf = pkt.shared_msg->payload + rtp_header_size; //rtp payload data
int rtp_payload_size = pkt.shared_msg->size - rtp_header_size; //rtp payload size;
bool is_first_packet_in_frame = false;
if (rtp_payload_size == 0){
//B0(1011):pad+ext, A0(1010):pad
if ((pkt.shared_msg->payload[0] & 0xFF) == 0xB0 || (pkt.shared_msg->payload[0] & 0xFF) == 0xA0){
//uint8_t padding_length = (uint8_t)(pkt.shared_msg->payload[pkt.shared_msg->size-1] & 0xFF);
srs_info("RTP: jitbuffer padding packet ts:%u, seq:%u, payload size:%d, padding size:%d",
pkt.header.get_timestamp(),pkt.header.get_sequence(),
rtp_payload_size, padding_length);
}
return kNoError;
}
SrsAvcNaluType nal_unit_type = SrsAvcNaluTypeReserved;
FrameType frameType = kVideoFrameDelta;
int8_t v = (uint8_t)pkt.nalu_type;
if (v == kStapA) {
singlenual = false;
packetyType = kH264StapA;
is_first_packet_in_frame = true;
SrsRtpSTAPPayload *payload = (SrsRtpSTAPPayload*)pkt.payload;
if (payload->get_sps() != NULL){
nal_unit_type = SrsAvcNaluTypeSPS;
frameType = kVideoFrameKey;
}
} else if (v == kFuA) {
SrsRtpFUAPayload2 *payload = (SrsRtpFUAPayload2*)pkt.payload;
int8_t nalu_byte0 = ((int8_t)payload->nri & 0xE0) | ((int8_t)payload->nalu_type & 0x1F);
nal_unit_type = (SrsAvcNaluType)(nalu_byte0 & 0x1f);
if (nal_unit_type == SrsAvcNaluTypeIDR){
frameType = kVideoFrameKey;
}
if (payload->start){
//xx xx ....
//xx nalu ....
rtp_payload_buf[1] = nalu_byte0;
//nalu ....
rtp_payload_buf = rtp_payload_buf + 1;
rtp_payload_size = rtp_payload_size - 1;
is_first_packet_in_frame = true;
}else {
//xx xx ....
//....
rtp_payload_buf = rtp_payload_buf + 2;
rtp_payload_size = rtp_payload_size - 2;
}
singlenual = false;
packetyType = kH264FuA;
} else {
singlenual = true;
packetyType = kH264SingleNalu;
is_first_packet_in_frame = true;
}
const VCMPacket packet((const uint8_t*)rtp_payload_buf, rtp_payload_size,
pkt.header.get_sequence(), pkt.header.get_timestamp(), pkt.header.get_marker(),
packetyType, kRtpVideoH264, singlenual, is_first_packet_in_frame, frameType);
++num_packets_;
if (num_packets_ == 1) {
time_first_packet_ms_ = srs_update_system_time();
}
//Does this packet belong to an old frame?
// if (last_decoded_state_.IsOldPacket(&packet)) {
// //return kOldPacket;
// }
//num_consecutive_old_packets_ = 0;
SrsRtpFrameBuffer* frame;
FrameList* frame_list;
const SrsRtpFrameBufferEnum error = GetFrameByRtpPacket(packet, &frame, &frame_list);
if (error != kNoError) {
return error;
}
//srs_utime_t now_ms = srs_update_system_time();
FrameData frame_data;
frame_data.rtt_ms = 0; //rtt_ms_;
frame_data.rolling_average_packets_per_frame = 25;//average_packets_per_frame_;
SrsRtpFrameBufferEnum buffer_state = frame->InsertPacket(packet, frame_data);
if (buffer_state > 0) {
incoming_bit_count_ += packet.sizeBytes << 3;
if (first_packet_since_reset_) {
latest_received_sequence_number_ = packet.seqNum;
first_packet_since_reset_ = false;
} else {
// if (IsPacketRetransmitted(packet)) {
// frame->IncrementNackCount();
// }
UpdateNackList(packet.seqNum);
latest_received_sequence_number_ = LatestSequenceNumber(
latest_received_sequence_number_, packet.seqNum);
}
}
// Is the frame already in the decodable list?
bool continuous = IsContinuous(*frame);
switch (buffer_state) {
case kGeneralError:
case kTimeStampError:
case kSizeError: {
free_frames_.push_back(frame);
break;
}
case kCompleteSession: {
//CountFrame(*frame);
// if (previous_state != kStateDecodable &&
// previous_state != kStateComplete) {
// /*CountFrame(*frame);*/ //????????????????????<3F>?? by ylr
// if (continuous) {
// // Signal that we have a complete session.
// frame_event_->Set();
// }
// }
}
// Note: There is no break here - continuing to kDecodableSession.
case kDecodableSession: {
// *retransmitted = (frame->GetNackCount() > 0);
if (true || continuous) {
decodable_frames_.InsertFrame(frame);
FindAndInsertContinuousFrames(*frame);
} else {
incomplete_frames_.InsertFrame(frame);
// If NACKs are enabled, keyframes are triggered by |GetNackList|.
// if (nack_mode_ == kNoNack && NonContinuousOrIncompleteDuration() >
// 90 * kMaxDiscontinuousFramesTime) {
// return kFlushIndicator;
// }
}
break;
}
case kIncomplete: {
if (frame->GetState() == kStateEmpty &&
last_decoded_state_.UpdateEmptyFrame(frame)) {
free_frames_.push_back(frame);
return kNoError;
} else {
incomplete_frames_.InsertFrame(frame);
// If NACKs are enabled, keyframes are triggered by |GetNackList|.
// if (nack_mode_ == kNoNack && NonContinuousOrIncompleteDuration() >
// 90 * kMaxDiscontinuousFramesTime) {
// return kFlushIndicator;
// }
}
break;
}
case kNoError:
case kOutOfBoundsPacket:
case kDuplicatePacket: {
// Put back the frame where it came from.
if (frame_list != NULL) {
frame_list->InsertFrame(frame);
} else {
free_frames_.push_back(frame);
}
++num_duplicated_packets_;
break;
}
case kFlushIndicator:{
free_frames_.push_back(frame);
}
return kFlushIndicator;
default:
assert(false);
}
return buffer_state;
}
// Gets frame to use for this timestamp. If no match, get empty frame.
SrsRtpFrameBufferEnum SrsRtpJitterBuffer::GetFrameByRtpPacket(const VCMPacket& packet,
SrsRtpFrameBuffer** frame,

View file

@ -413,7 +413,6 @@ public:
public:
srs_error_t start();
void Reset();
SrsRtpFrameBufferEnum InsertPacket2(const SrsRtpPacket2 &pkt, bool* retransmitted);
SrsRtpFrameBufferEnum InsertPacket(uint16_t seq, uint32_t ts, bool maker, char *buf, int size,
bool* retransmitted);
void ReleaseFrame(SrsRtpFrameBuffer* frame);

View file

@ -818,12 +818,8 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack
SrsRtpRawPayload* raw = new SrsRtpRawPayload();
pkt->payload = raw;
raw->payload = new char[size];
raw->payload = pkt->wrap(data, size);
raw->nn_payload = size;
memcpy(raw->payload, data, size);
pkt->shared_msg = new SrsSharedPtrMessage();
pkt->shared_msg->wrap(raw->payload, size);
*ppkt = pkt;
@ -964,9 +960,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare
// Copy the SPS/PPS bytes, because it may change.
int size = (int)(sps.size() + pps.size());
char* payload = new char[size];
pkt->shared_msg = new SrsSharedPtrMessage();
pkt->shared_msg->wrap(payload, size);
char* payload = pkt->wrap(size);
if (true) {
SrsSample* sample = new SrsSample();
@ -1038,7 +1032,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
pkt->header.set_sequence(video_sequence++);
pkt->header.set_timestamp(msg->timestamp * 90);
pkt->payload = raw;
pkt->shared_msg = msg->copy();
pkt->wrap(msg);
pkts.push_back(pkt);
} else {
// We must free it, should never use RTP packets to free it,
@ -1077,7 +1071,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
fua->end = bool(i == num_of_packet - 1);
pkt->payload = fua;
pkt->shared_msg = msg->copy();
pkt->wrap(msg);
pkts.push_back(pkt);
nb_left -= packet_size;
@ -1105,7 +1099,8 @@ srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg,
raw->payload = sample->bytes;
raw->nn_payload = sample->size;
pkt->shared_msg = msg->copy();
pkt->wrap(msg);
pkts.push_back(pkt);
return err;
@ -1142,7 +1137,8 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam
fua->payload = p;
fua->size = packet_size;
pkt->shared_msg = msg->copy();
pkt->wrap(msg);
pkts.push_back(pkt);
p += packet_size;

View file

@ -49,15 +49,19 @@ SrsBuffer::SrsBuffer(char* b, int nn)
{
p = bytes = b;
nb_bytes = nn;
// TODO: support both little and big endian.
srs_assert(srs_is_little_endian());
}
SrsBuffer::~SrsBuffer()
{
}
SrsBuffer* SrsBuffer::copy()
{
SrsBuffer* cp = new SrsBuffer(bytes, nb_bytes);
cp->p = p;
return cp;
}
char* SrsBuffer::data()
{
return bytes;

View file

@ -109,6 +109,8 @@ public:
SrsBuffer(char* b, int nn);
~SrsBuffer();
public:
// Copy the object, keep position of buffer.
SrsBuffer* copy();
// Get the data and head of buffer.
// current-bytes = head() = data() + pos()
char* data();

View file

@ -782,20 +782,39 @@ SrsRtpPacket2::~SrsRtpPacket2()
srs_freep(cache_buffer_);
}
void SrsRtpPacket2::wrap(char* data, int size)
char* SrsRtpPacket2::wrap(int size)
{
srs_freep(shared_msg);
shared_msg = new SrsSharedPtrMessage();
char* buf = new char[size];
memcpy(buf, data, size);
shared_msg->wrap(buf, size);
srs_freep(cache_buffer_);
cache_buffer_ = new SrsBuffer(buf, size);
return buf;
}
SrsBuffer* SrsRtpPacket2::cache_buffer()
char* SrsRtpPacket2::wrap(char* data, int size)
{
char* buf = wrap(size);
memcpy(buf, data, size);
return buf;
}
char* SrsRtpPacket2::wrap(SrsSharedPtrMessage* msg)
{
srs_freep(shared_msg);
shared_msg = msg->copy();
srs_freep(cache_buffer_);
cache_buffer_ = new SrsBuffer(msg->payload, msg->size);
return msg->payload;
}
SrsBuffer* SrsRtpPacket2::cache_buffer() const
{
return cache_buffer_;
}
@ -835,6 +854,7 @@ SrsRtpPacket2* SrsRtpPacket2::copy()
cp->nalu_type = nalu_type;
cp->shared_msg = shared_msg? shared_msg->copy():NULL;
cp->cache_buffer_ = cache_buffer_? cache_buffer_->copy():NULL;
cp->frame_type = frame_type;
cp->cached_payload_size = cached_payload_size;

View file

@ -268,13 +268,15 @@ class SrsRtpPacket2
public:
SrsRtpHeader header;
ISrsRtpPayloader* payload;
private:
// The buffer bind to the shared message.
SrsBuffer* cache_buffer_;
// The original shared message, all RTP packets can refer to its data.
SrsSharedPtrMessage* shared_msg;
// Helper fields.
public:
// The first byte as nalu type, for video decoder only.
SrsAvcNaluType nalu_type;
// The original shared message, all RTP packets can refer to its data.
SrsSharedPtrMessage* shared_msg;
// The frame type, for RTMP bridger or SFU source.
SrsFrameType frame_type;
// Fast cache for performance.
@ -288,8 +290,12 @@ public:
virtual ~SrsRtpPacket2();
public:
// Wrap buffer to shared_message, which is managed by us.
void wrap(char* data, int size);
SrsBuffer* cache_buffer();
char* wrap(int size);
char* wrap(char* data, int size);
// Wrap the shared message, we copy it.
char* wrap(SrsSharedPtrMessage* msg);
// Get the cache buffer which binds to the shared message.
SrsBuffer* cache_buffer() const;
public:
// Set the padding of RTP packet.
void set_padding(int size);