1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refactor code to keep sample function order

This commit is contained in:
winlin 2020-05-02 09:15:49 +08:00
parent 7b5fa0e391
commit 6f2b78f16a
10 changed files with 987 additions and 593 deletions

File diff suppressed because it is too large Load diff

View file

@ -33,6 +33,7 @@
#include <srs_app_hourglass.hpp>
#include <srs_app_sdp.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_rtp.hpp>
#include <string>
#include <map>
@ -54,6 +55,8 @@ class ISrsUdpSender;
class SrsRtpQueue;
class SrsRtpH264Demuxer;
class SrsRtpOpusDemuxer;
class SrsRtpPacket2;
class ISrsCodec;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -248,7 +251,7 @@ private:
srs_error_t package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
};
class SrsRtcPublisher : virtual public ISrsHourGlass
class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler
{
private:
SrsHourGlass* report_timer;
@ -258,7 +261,6 @@ private:
uint32_t audio_ssrc;
private:
SrsRtpH264Demuxer* rtp_h264_demuxer;
SrsRtpOpusDemuxer* rtp_opus_demuxer;
SrsRtpQueue* rtp_video_queue;
SrsRtpQueue* rtp_audio_queue;
private:
@ -283,11 +285,14 @@ private:
srs_error_t send_rtcp_fb_pli(uint32_t ssrc);
public:
srs_error_t on_rtp(char* buf, int nb_buf);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload);
private:
srs_error_t on_audio(SrsRtpSharedPacket* pkt);
srs_error_t collect_audio_frame();
srs_error_t on_video(SrsRtpSharedPacket* pkt);
srs_error_t collect_video_frame();
srs_error_t on_audio(SrsRtpPacket2* pkt);
srs_error_t collect_audio_frames();
srs_error_t do_collect_audio_frame(SrsRtpPacket2* packet);
srs_error_t on_video(SrsRtpPacket2* pkt);
srs_error_t collect_video_frames();
srs_error_t do_collect_video_frame(std::vector<SrsRtpPacket2*>& packets);
public:
void request_keyframe();
// interface ISrsHourGlass
@ -334,25 +339,18 @@ public:
SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id);
virtual ~SrsRtcSession();
public:
SrsSdp* get_local_sdp() { return &local_sdp; }
SrsSdp* get_local_sdp();
void set_local_sdp(const SrsSdp& sdp);
SrsSdp* get_remote_sdp() { return &remote_sdp; }
void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; }
SrsRtcSessionStateType get_session_state() { return session_state; }
void set_session_state(SrsRtcSessionStateType state) { session_state = state; }
std::string id() const { return peer_id + "_" + username; }
std::string get_peer_id() const { return peer_id; }
void set_peer_id(const std::string& id) { peer_id = id; }
void set_encrypt(bool v) { encrypt = v; }
SrsSdp* get_remote_sdp();
void set_remote_sdp(const SrsSdp& sdp);
SrsRtcSessionStateType get_session_state();
void set_session_state(SrsRtcSessionStateType state);
std::string id() const;
std::string get_peer_id() const;
void set_peer_id(const std::string& id);
void set_encrypt(bool v);
void switch_to_context();
int context_id() { return cid; }
int context_id();
public:
srs_error_t initialize();
// The peer address may change, we can identify that by STUN messages.
@ -361,7 +359,6 @@ public:
srs_error_t on_rtcp(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
public:
srs_error_t send_client_hello();
srs_error_t on_connection_established();
srs_error_t start_play();
srs_error_t start_publish();

View file

@ -141,8 +141,8 @@ SrsRtpRingBuffer::SrsRtpRingBuffer(size_t capacity)
capacity_ = capacity;
initialized_ = false;
queue_ = new SrsRtpSharedPacket*[capacity_];
memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity);
queue_ = new SrsRtpPacket2*[capacity_];
memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity);
}
SrsRtpRingBuffer::~SrsRtpRingBuffer()
@ -150,9 +150,9 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer()
srs_freepa(queue_);
}
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpSharedPacket* pkt)
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
{
SrsRtpSharedPacket* p = queue_[at % capacity_];
SrsRtpPacket2* p = queue_[at % capacity_];
if (p) {
srs_freep(p);
@ -161,9 +161,11 @@ void SrsRtpRingBuffer::set(uint16_t at, SrsRtpSharedPacket* pkt)
queue_[at % capacity_] = pkt;
}
void SrsRtpRingBuffer::remove(uint16_t at)
void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high)
{
set(at, NULL);
for (uint16_t s = low; s != high; ++s) {
queue_[s % capacity_] = NULL;
}
}
uint16_t SrsRtpRingBuffer::next_start_of_frame()
@ -173,8 +175,8 @@ uint16_t SrsRtpRingBuffer::next_start_of_frame()
}
for (uint16_t s = low_ + 1 ; s != high_; ++s) {
SrsRtpSharedPacket*& pkt = queue_[s % capacity_];
if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) {
SrsRtpPacket2*& pkt = queue_[s % capacity_];
if (pkt && pkt->is_first_packet_of_frame) {
return s;
}
}
@ -189,8 +191,8 @@ uint16_t SrsRtpRingBuffer::next_keyframe()
}
for (uint16_t s = low_ + 1 ; s != high_; ++s) {
SrsRtpSharedPacket*& pkt = queue_[s % capacity_];
if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) {
SrsRtpPacket2*& pkt = queue_[s % capacity_];
if (pkt && pkt->is_key_frame && pkt->is_first_packet_of_frame) {
return s;
}
}
@ -269,7 +271,7 @@ SrsRtpQueue::~SrsRtpQueue()
srs_freep(nack_);
}
srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt)
srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
@ -337,8 +339,8 @@ srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt)
queue_->advance_to(next + 1);
}
// TODO: FIXME: Change to ptr of ptr.
queue_->set(seq, pkt->copy());
// Save packet at the position seq.
queue_->set(seq, pkt);
// Collect packets to frame when:
// 1. Marker bit means the last packet of frame received.
@ -351,7 +353,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt)
return err;
}
void SrsRtpQueue::collect_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames)
void SrsRtpQueue::collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames)
{
frames.swap(frames_);
}
@ -446,33 +448,39 @@ void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end)
void SrsRtpQueue::collect_packet()
{
vector<SrsRtpSharedPacket*> frame;
for (uint16_t s = queue_->low(); s != queue_->high(); ++s) {
SrsRtpSharedPacket* pkt = queue_->at(s);
while (queue_->low() != queue_->high()) {
vector<SrsRtpPacket2*> frame;
for (uint16_t s = queue_->low(); s != queue_->high(); ++s) {
SrsRtpPacket2* pkt = queue_->at(s);
if (nack_->find(s) != NULL) {
srs_verbose("seq=%u, found in nack list when collect frame", s);
break;
}
// In NACK, never collect frame.
if (nack_->find(s) != NULL) {
srs_verbose("seq=%u, found in nack list when collect frame", s);
return;
}
// We must collect frame from first packet to last packet.
if (s == queue_->low() && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) {
break;
}
// Ignore when the first packet not the start.
if (s == queue_->low() && pkt->nn_original_payload && !pkt->is_first_packet_of_frame) {
return;
}
frame.push_back(pkt->copy());
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
// OK, collect packet to frame.
frame.push_back(pkt);
// Not the last packet, continue to process next one.
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
continue;
}
// Done, we got the last packet of frame.
nn_collected_frames++;
frames_.push_back(frame);
frame.clear();
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), s);
srs_verbose("head seq=%u, update to %u because collect one full farme", queue_->low(), s + 1);
queue_->advance_to(s + 1);
}
}
// remove the tmp buffer
for (size_t i = 0; i < frame.size(); ++i) {
srs_freep(frame[i]);
}
}

View file

@ -30,7 +30,7 @@
#include <vector>
#include <map>
class SrsRtpSharedPacket;
class SrsRtpPacket2;
class SrsRtpQueue;
struct SrsNackOption
@ -119,7 +119,7 @@ private:
// Capacity of the ring-buffer.
uint16_t capacity_;
// Ring bufer.
SrsRtpSharedPacket** queue_;
SrsRtpPacket2** queue_;
// Increase one when uint16 flip back, for get_extended_highest_sequence.
uint64_t nn_seq_flip_backs;
// Whether initialized, because we use uint16 so we can't use -1.
@ -132,12 +132,17 @@ public:
SrsRtpRingBuffer(size_t capacity);
virtual ~SrsRtpRingBuffer();
public:
// Move the position of buffer.
uint16_t low() { return low_; }
uint16_t high() { return high_; }
void advance_to(uint16_t seq) { low_ = seq; }
void set(uint16_t at, SrsRtpSharedPacket* pkt);
void remove(uint16_t at);
bool overflow() { return low_ + capacity_ < high_; }
// Free the packet at position.
void set(uint16_t at, SrsRtpPacket2* pkt);
void remove(uint16_t at) { set(at, NULL); }
// Directly reset range [low, high] to NULL.
void reset(uint16_t low, uint16_t high);
// Whether queue overflow or heavy(too many packets and need clear).
bool overflow() { return high_ - low_ < capacity_; }
bool is_heavy() { return high_ - low_ >= capacity_ / 2; }
// Get the next start packet of frame.
// @remark If not found, return the low_, which should never be the "next" one,
@ -151,7 +156,7 @@ public:
// Update the sequence, got the nack range by [low, high].
void update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high);
// Get the packet by seq.
SrsRtpSharedPacket* at(uint16_t seq) { return queue_[seq % capacity_]; }
SrsRtpPacket2* at(uint16_t seq) { return queue_[seq % capacity_]; }
};
class SrsRtpQueue
@ -170,14 +175,14 @@ private:
uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_;
private:
std::vector<std::vector<SrsRtpSharedPacket*> > frames_;
std::vector<std::vector<SrsRtpPacket2*> > frames_;
bool request_key_frame_;
public:
SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false);
virtual ~SrsRtpQueue();
public:
srs_error_t consume(SrsRtpSharedPacket* pkt);
void collect_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
srs_error_t consume(SrsRtpPacket2* pkt);
void collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames);
bool should_request_key_frame();
void notify_drop_seq(uint16_t seq);
void notify_nack_list_full();