1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refactor RTC publisher queue, rename sender to player

This commit is contained in:
winlin 2020-05-03 13:37:04 +08:00
parent ef64c5e2bd
commit 9c8b7279f0
4 changed files with 65 additions and 110 deletions

View file

@ -595,7 +595,7 @@ SrsRtpPacket2* SrsRtcPackets::at(int index)
return cache + index;
}
SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, int parent_cid)
SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
{
_parent_cid = parent_cid;
trd = new SrsDummyCoroutine();
@ -618,14 +618,14 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, int parent_cid)
_srs_config->subscribe(this);
}
SrsRtcSenderThread::~SrsRtcSenderThread()
SrsRtcPlayer::~SrsRtcPlayer()
{
_srs_config->unsubscribe(this);
srs_freep(trd);
}
srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt)
srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt)
{
srs_error_t err = srs_success;
@ -644,7 +644,7 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t
return err;
}
srs_error_t SrsRtcSenderThread::on_reload_rtc_server()
srs_error_t SrsRtcPlayer::on_reload_rtc_server()
{
gso = _srs_config->get_rtc_server_gso();
merge_nalus = _srs_config->get_rtc_server_merge_nalus();
@ -655,7 +655,7 @@ srs_error_t SrsRtcSenderThread::on_reload_rtc_server()
return srs_success;
}
srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost)
srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost)
{
SrsRequest* req = rtc_session->req;
@ -672,17 +672,17 @@ srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost)
return srs_success;
}
srs_error_t SrsRtcSenderThread::on_reload_vhost_realtime(string vhost)
srs_error_t SrsRtcPlayer::on_reload_vhost_realtime(string vhost)
{
return on_reload_vhost_play(vhost);
}
int SrsRtcSenderThread::cid()
int SrsRtcPlayer::cid()
{
return trd->cid();
}
srs_error_t SrsRtcSenderThread::start()
srs_error_t SrsRtcPlayer::start()
{
srs_error_t err = srs_success;
@ -696,17 +696,17 @@ srs_error_t SrsRtcSenderThread::start()
return err;
}
void SrsRtcSenderThread::stop()
void SrsRtcPlayer::stop()
{
trd->stop();
}
void SrsRtcSenderThread::stop_loop()
void SrsRtcPlayer::stop_loop()
{
trd->interrupt();
}
srs_error_t SrsRtcSenderThread::cycle()
srs_error_t SrsRtcPlayer::cycle()
{
srs_error_t err = srs_success;
@ -833,7 +833,7 @@ srs_error_t SrsRtcSenderThread::cycle()
}
}
srs_error_t SrsRtcSenderThread::send_messages(
srs_error_t SrsRtcPlayer::send_messages(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) {
srs_error_t err = srs_success;
@ -866,7 +866,7 @@ srs_error_t SrsRtcSenderThread::send_messages(
return err;
}
srs_error_t SrsRtcSenderThread::messages_to_packets(
srs_error_t SrsRtcPlayer::messages_to_packets(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) {
srs_error_t err = srs_success;
@ -951,7 +951,7 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
return err;
}
srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets)
srs_error_t SrsRtcPlayer::send_packets(SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -1019,7 +1019,7 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets)
}
// TODO: FIXME: We can gather and pad audios, because they have similar size.
srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -1222,7 +1222,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
return err;
}
srs_error_t SrsRtcSenderThread::package_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
srs_error_t SrsRtcPlayer::package_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -1312,7 +1312,7 @@ srs_error_t SrsRtcSenderThread::package_nalus(SrsSharedPtrMessage* msg, SrsRtcPa
return err;
}
srs_error_t SrsRtcSenderThread::package_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload)
srs_error_t SrsRtcPlayer::package_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload)
{
srs_error_t err = srs_success;
@ -1348,7 +1348,7 @@ srs_error_t SrsRtcSenderThread::package_opus(SrsSample* sample, SrsRtcPackets& p
return err;
}
srs_error_t SrsRtcSenderThread::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets)
srs_error_t SrsRtcPlayer::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -1389,7 +1389,7 @@ srs_error_t SrsRtcSenderThread::package_fu_a(SrsSharedPtrMessage* msg, SrsSample
}
// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6
srs_error_t SrsRtcSenderThread::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets)
srs_error_t SrsRtcPlayer::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -1409,7 +1409,7 @@ srs_error_t SrsRtcSenderThread::package_single_nalu(SrsSharedPtrMessage* msg, Sr
return err;
}
srs_error_t SrsRtcSenderThread::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
srs_error_t SrsRtcPlayer::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -1937,7 +1937,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frames()
srs_error_t err = srs_success;
std::vector<std::vector<SrsRtpPacket2*> > frames;
audio_queue_->collect_frames(frames);
audio_queue_->collect_frames(audio_nack_, frames);
for (size_t i = 0; i < frames.size(); ++i) {
vector<SrsRtpPacket2*>& packets = frames[i];
@ -2031,7 +2031,7 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
srs_error_t SrsRtcPublisher::collect_video_frames()
{
std::vector<std::vector<SrsRtpPacket2*> > frames;
video_queue_->collect_frames(frames);
video_queue_->collect_frames(video_nack_, frames);
for (size_t i = 0; i < frames.size(); ++i) {
vector<SrsRtpPacket2*>& packets = frames[i];
@ -2252,7 +2252,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
source_ = NULL;
publisher = NULL;
sender = NULL;
player = NULL;
sendonly_skt = NULL;
rtc_server = s;
dtls_session = new SrsDtlsSession(this);
@ -2268,7 +2268,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
SrsRtcSession::~SrsRtcSession()
{
srs_freep(sender);
srs_freep(player);
srs_freep(publisher);
srs_freep(dtls_session);
srs_freep(req);
@ -2551,8 +2551,8 @@ srs_error_t SrsRtcSession::start_play()
{
srs_error_t err = srs_success;
srs_freep(sender);
sender = new SrsRtcSenderThread(this, _srs_context->get_id());
srs_freep(player);
player = new SrsRtcPlayer(this, _srs_context->get_id());
uint32_t video_ssrc = 0;
uint32_t audio_ssrc = 0;
@ -2569,12 +2569,12 @@ srs_error_t SrsRtcSession::start_play()
}
}
if ((err = sender->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) {
return srs_error_wrap(err, "SrsRtcSenderThread init");
if ((err = player->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) {
return srs_error_wrap(err, "SrsRtcPlayer init");
}
if ((err = sender->start()) != srs_success) {
return srs_error_wrap(err, "start SrsRtcSenderThread");
if ((err = player->start()) != srs_success) {
return srs_error_wrap(err, "start SrsRtcPlayer");
}
return err;

View file

@ -194,8 +194,7 @@ public:
SrsRtpPacket2* at(int index);
};
// TODO: FIXME: Rename to RTC player or subscriber.
class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
class SrsRtcPlayer : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
{
protected:
SrsCoroutine* trd;
@ -221,8 +220,8 @@ private:
int mw_msgs;
bool realtime;
public:
SrsRtcSenderThread(SrsRtcSession* s, int parent_cid);
virtual ~SrsRtcSenderThread();
SrsRtcPlayer(SrsRtcSession* s, int parent_cid);
virtual ~SrsRtcPlayer();
public:
srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt);
// interface ISrsReloadHandler
@ -303,13 +302,13 @@ public:
class SrsRtcSession
{
friend class SrsDtlsSession;
friend class SrsRtcSenderThread;
friend class SrsRtcPlayer;
friend class SrsRtcPublisher;
private:
SrsRtcServer* rtc_server;
SrsRtcSessionStateType session_state;
SrsDtlsSession* dtls_session;
SrsRtcSenderThread* sender;
SrsRtcPlayer* player;
SrsRtcPublisher* publisher;
bool is_publisher_;
private:

View file

@ -187,11 +187,6 @@ bool SrsRtpRingBuffer::overflow()
return high_ - low_ >= capacity_;
}
bool SrsRtpRingBuffer::is_heavy()
{
return high_ - low_ >= capacity_ / 2;
}
uint16_t SrsRtpRingBuffer::next_start_of_frame()
{
if (low_ == high_) {
@ -269,7 +264,7 @@ SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq)
return queue_[seq % capacity_];
}
SrsRtpQueue::SrsRtpQueue(const char* tag, int capacity)
SrsRtpQueue::SrsRtpQueue(int capacity)
{
nn_collected_frames = 0;
queue_ = new SrsRtpRingBuffer(capacity);
@ -282,8 +277,6 @@ SrsRtpQueue::SrsRtpQueue(const char* tag, int capacity)
num_of_packet_received_ = 0;
number_of_packet_lossed_ = 0;
tag_ = tag;
}
SrsRtpQueue::~SrsRtpQueue()
@ -329,27 +322,17 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
uint16_t nack_low = 0, nack_high = 0;
queue_->update(seq, !nn_collected_frames, nack_low, nack_high);
if (srs_rtp_seq_distance(nack_low, nack_high)) {
srs_trace("%s update nack seq=%u, startup=%d, range [%u, %u]", tag_, seq, !nn_collected_frames, nack_low, nack_high);
srs_trace("update nack seq=%u, startup=%d, range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high);
insert_into_nack_list(nack, nack_low, nack_high);
}
}
// When packets overflow, collect frame and move head to next frame start.
if (queue_->overflow()) {
on_overflow(nack);
}
// Save packet at the position seq.
queue_->set(seq, pkt);
return err;
}
void SrsRtpQueue::collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames)
{
frames.swap(frames_);
}
void SrsRtpQueue::notify_drop_seq(uint16_t seq)
{
uint16_t next = queue_->next_start_of_frame();
@ -360,7 +343,7 @@ void SrsRtpQueue::notify_drop_seq(uint16_t seq)
}
// When NACK is timeout, move to the next start of frame.
srs_trace("%s nack drop seq=%u, drop range [%u, %u]", tag_, seq, queue_->low(), next + 1);
srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1);
queue_->advance_to(next + 1);
}
@ -374,7 +357,7 @@ void SrsRtpQueue::notify_nack_list_full()
}
// When NACK is overflow, move to the next keyframe.
srs_trace("%s nack overflow drop range [%u, %u]", tag_, queue_->low(), next + 1);
srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1);
queue_->advance_to(next + 1);
}
@ -417,7 +400,7 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t se
nack->check_queue_size();
}
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue("audio", capacity)
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue(capacity)
{
}
@ -425,27 +408,21 @@ SrsRtpAudioQueue::~SrsRtpAudioQueue()
{
}
srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<std::vector<SrsRtpPacket2*> >& frames)
{
srs_error_t err = srs_success;
collect_packet(frames, nack);
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "audio queue");
if (queue_->overflow()) {
on_overflow(nack);
}
// For audio, always try to collect frame, because each packet is a frame.
collect_packet(nack);
return err;
}
void SrsRtpAudioQueue::on_overflow(SrsRtpNackForReceiver* nack)
{
collect_packet(nack);
queue_->advance_to(queue_->high());
}
void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack)
void SrsRtpAudioQueue::collect_packet(vector<vector<SrsRtpPacket2*> >& frames, SrsRtpNackForReceiver* nack)
{
// When done, s point to the next available packet.
uint16_t next = queue_->low();
@ -455,7 +432,7 @@ void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack)
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("%s wait for nack seq=%u", tag_, next);
srs_trace("wait for nack seq=%u", next);
break;
}
@ -465,19 +442,19 @@ void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack)
// Done, we got the last packet of frame.
nn_collected_frames++;
frames_.push_back(frame);
frames.push_back(frame);
}
if (queue_->low() != next) {
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), next);
srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next);
srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next);
queue_->advance_to(next);
}
}
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue("video", capacity)
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue(capacity)
{
request_key_frame_ = false;
}
@ -486,23 +463,13 @@ SrsRtpVideoQueue::~SrsRtpVideoQueue()
{
}
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<std::vector<SrsRtpPacket2*> >& frames)
{
srs_error_t err = srs_success;
collect_packet(frames, nack);
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "video queue");
if (queue_->overflow()) {
on_overflow(nack);
}
// Collect packets to frame when:
// 1. Marker bit means the last packet of frame received.
// 2. Queue has lots of packets, the load is heavy.
// TODO: FIMXE: For real-time, we should collect each frame ASAP.
if (pkt->rtp_header.get_marker() || queue_->is_heavy()) {
collect_packet(nack);
}
return err;
}
bool SrsRtpVideoQueue::should_request_key_frame()
@ -522,26 +489,24 @@ void SrsRtpVideoQueue::request_keyframe()
void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
{
collect_packet(nack);
uint16_t next = queue_->next_start_of_frame();
// Note that low_ mean not found, clear queue util one packet.
if (next == queue_->low()) {
next = queue_->high() - 1;
}
srs_trace("%s seq out of range [%u, %u]", tag_, queue_->low(), next);
srs_trace("seq out of range [%u, %u]", queue_->low(), next);
for (uint16_t s = queue_->low(); s != next; ++s) {
nack->remove(s);
queue_->remove(s);
}
srs_trace("%s force update seq %u to %u", tag_, queue_->low(), next + 1);
srs_trace("force update seq %u to %u", queue_->low(), next + 1);
queue_->advance_to(next + 1);
}
void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack)
void SrsRtpVideoQueue::collect_packet(vector<vector<SrsRtpPacket2*> >& frames, SrsRtpNackForReceiver* nack)
{
while (queue_->low() != queue_->high()) {
vector<SrsRtpPacket2*> frame;
@ -553,7 +518,7 @@ void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack)
}
nn_collected_frames++;
frames_.push_back(frame);
frames.push_back(frame);
}
}
@ -569,7 +534,7 @@ void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector<Srs
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("%s wait for nack seq=%u", tag_, next);
srs_trace("wait for nack seq=%u", next);
break;
}
@ -599,7 +564,7 @@ void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector<Srs
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), next);
srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next);
srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next);
queue_->advance_to(next);
}
}

View file

@ -143,7 +143,6 @@ public:
void reset(uint16_t low, uint16_t high);
// Whether queue overflow or heavy(too many packets and need clear).
bool overflow();
bool is_heavy();
// For video, get the next start packet of frame.
// @remark If not found, return the low_, which should never be the "next" one,
// because it MAY or NOT current start packet of frame but never be the next.
@ -172,15 +171,11 @@ private:
protected:
SrsRtpRingBuffer* queue_;
uint64_t nn_collected_frames;
std::vector<std::vector<SrsRtpPacket2*> > frames_;
const char* tag_;
public:
SrsRtpQueue(const char* tag, int capacity);
SrsRtpQueue(int capacity);
virtual ~SrsRtpQueue();
public:
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
// TODO: FIXME: Should merge FU-A to RAW, then we can return RAW payloads.
void collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames);
void notify_drop_seq(uint16_t seq);
void notify_nack_list_full();
public:
@ -190,8 +185,6 @@ public:
uint32_t get_interarrival_jitter();
private:
void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end);
protected:
virtual void on_overflow(SrsRtpNackForReceiver* nack) = 0;
};
class SrsRtpAudioQueue : public SrsRtpQueue
@ -200,11 +193,10 @@ public:
SrsRtpAudioQueue(int capacity);
virtual ~SrsRtpAudioQueue();
public:
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<std::vector<SrsRtpPacket2*> >& frames);
private:
virtual void on_overflow(SrsRtpNackForReceiver* nack);
protected:
virtual void collect_packet(SrsRtpNackForReceiver* nack);
virtual void collect_packet(std::vector<std::vector<SrsRtpPacket2*> >& frames, SrsRtpNackForReceiver* nack);
};
class SrsRtpVideoQueue : public SrsRtpQueue
@ -215,13 +207,12 @@ public:
SrsRtpVideoQueue(int capacity);
virtual ~SrsRtpVideoQueue();
public:
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<std::vector<SrsRtpPacket2*> >& frames);
bool should_request_key_frame();
void request_keyframe();
protected:
virtual void on_overflow(SrsRtpNackForReceiver* nack);
private:
virtual void collect_packet(SrsRtpNackForReceiver* nack);
virtual void on_overflow(SrsRtpNackForReceiver* nack);
virtual void collect_packet(std::vector<std::vector<SrsRtpPacket2*> >& frames, SrsRtpNackForReceiver* nack);
virtual void do_collect_packet(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame);
};