1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

RTC: Refine PLI request as worker

This commit is contained in:
winlin 2020-09-10 17:45:19 +08:00
parent 4c459a004e
commit e19631a2b7
4 changed files with 158 additions and 44 deletions

View file

@ -282,6 +282,75 @@ srs_error_t SrsPlaintextTransport::unprotect_rtcp(const char* cipher, char* plai
return srs_success; return srs_success;
} }
ISrsRtcPLIWorkerHandler::ISrsRtcPLIWorkerHandler()
{
}
ISrsRtcPLIWorkerHandler::~ISrsRtcPLIWorkerHandler()
{
}
ISrsRtcPLIWorker::ISrsRtcPLIWorker(ISrsRtcPLIWorkerHandler* h)
{
handler_ = h;
wait_ = srs_cond_new();
trd_ = new SrsSTCoroutine("pli", this, _srs_context->get_id());
}
ISrsRtcPLIWorker::~ISrsRtcPLIWorker()
{
srs_cond_signal(wait_);
trd_->stop();
srs_freep(trd_);
srs_cond_destroy(wait_);
}
srs_error_t ISrsRtcPLIWorker::start()
{
srs_error_t err = srs_success;
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start pli worker");
}
return err;
}
void ISrsRtcPLIWorker::request_keyframe(uint32_t ssrc, SrsContextId cid)
{
plis_.insert(make_pair(ssrc, cid));
srs_cond_signal(wait_);
}
srs_error_t ISrsRtcPLIWorker::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "quit");
}
std::map<uint32_t, SrsContextId> plis;
plis.swap(plis_);
for (map<uint32_t, SrsContextId>::iterator it = plis.begin(); it != plis.end(); ++it) {
uint32_t ssrc = it->first;
SrsContextId cid = it->second;
if ((err = handler_->do_request_keyframe(ssrc, cid)) != srs_success) {
srs_warn("PLI error, %s", srs_error_desc(err).c_str());
srs_error_reset(err);
}
}
srs_cond_wait(wait_);
}
return err;
}
SrsRtcPlayStreamStatistic::SrsRtcPlayStreamStatistic() SrsRtcPlayStreamStatistic::SrsRtcPlayStreamStatistic()
{ {
nn_rtp_pkts = 0; nn_rtp_pkts = 0;
@ -314,13 +383,14 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
_srs_config->subscribe(this); _srs_config->subscribe(this);
timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS); timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS);
nack_epp = new SrsErrorPithyPrint(); pli_worker_ = new ISrsRtcPLIWorker(this);
} }
SrsRtcPlayStream::~SrsRtcPlayStream() SrsRtcPlayStream::~SrsRtcPlayStream()
{ {
_srs_config->unsubscribe(this); _srs_config->unsubscribe(this);
srs_freep(pli_worker_);
srs_freep(trd); srs_freep(trd);
srs_freep(timer_); srs_freep(timer_);
srs_freep(req_); srs_freep(req_);
@ -421,6 +491,10 @@ srs_error_t SrsRtcPlayStream::start()
return srs_error_wrap(err, "start timer"); return srs_error_wrap(err, "start timer");
} }
if ((err = pli_worker_->start()) != srs_success) {
return srs_error_wrap(err, "start pli worker");
}
if (_srs_rtc_hijacker) { if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_play(session_, this, req_)) != srs_success) { if ((err = _srs_rtc_hijacker->on_start_play(session_, this, req_)) != srs_success) {
return srs_error_wrap(err, "on start play"); return srs_error_wrap(err, "on start play");
@ -760,13 +834,9 @@ srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp)
uint8_t fmt = rtcp->get_rc(); uint8_t fmt = rtcp->get_rc();
switch (fmt) { switch (fmt) {
case kPLI: { case kPLI: {
ISrsRtcPublishStream* publisher = source_->publish_stream(); uint32_t ssrc = get_video_publish_ssrc(rtcp->get_media_ssrc());
if (publisher) { if (ssrc) {
uint32_t ssrc = get_video_publish_ssrc(rtcp->get_media_ssrc()); pli_worker_->request_keyframe(ssrc, cid_);
if (ssrc != 0) {
publisher->request_keyframe(ssrc);
srs_info("RTC request PLI");
}
} }
session_->stat_->nn_pli++; session_->stat_->nn_pli++;
@ -804,6 +874,23 @@ uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc)
return 0; return 0;
} }
srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId cid)
{
srs_error_t err = srs_success;
// The source MUST exists, when PLI thread is running.
srs_assert(source_);
ISrsRtcPublishStream* publisher = source_->publish_stream();
if (!publisher) {
return err;
}
publisher->request_keyframe(ssrc);
return err;
}
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
{ {
timer_ = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); timer_ = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
@ -823,6 +910,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
nn_audio_frames = 0; nn_audio_frames = 0;
twcc_id_ = 0; twcc_id_ = 0;
twcc_fb_count_ = 0; twcc_fb_count_ = 0;
pli_worker_ = new ISrsRtcPLIWorker(this);
} }
SrsRtcPublishStream::~SrsRtcPublishStream() SrsRtcPublishStream::~SrsRtcPublishStream()
@ -831,16 +920,16 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
_srs_rtc_hijacker->on_stop_publish(session_, this, req); _srs_rtc_hijacker->on_stop_publish(session_, this, req);
} }
// TODO: FIXME: Should remove and delete source.
if (source) { if (source) {
source->set_publish_stream(NULL); source->set_publish_stream(NULL);
source->on_unpublish(); source->on_unpublish();
} }
// TODO: FIXME: Should remove and delete source. srs_freep(timer_);
srs_freep(pli_worker_);
srs_freep(pli_epp); srs_freep(pli_epp);
srs_freep(req); srs_freep(req);
srs_freep(timer_);
} }
srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescription* stream_desc) srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescription* stream_desc)
@ -908,6 +997,10 @@ srs_error_t SrsRtcPublishStream::start()
return srs_error_wrap(err, "on publish"); return srs_error_wrap(err, "on publish");
} }
if ((err = pli_worker_->start()) != srs_success) {
return srs_error_wrap(err, "start pli worker");
}
if (_srs_rtc_hijacker) { if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) { if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) {
return srs_error_wrap(err, "on start publish"); return srs_error_wrap(err, "on start publish");
@ -1302,23 +1395,30 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(SrsRtcpXr* rtcp)
return err; return err;
} }
// TODO: FIXME: Use async request PLI to prevent dup requests.
void SrsRtcPublishStream::request_keyframe(uint32_t ssrc) void SrsRtcPublishStream::request_keyframe(uint32_t ssrc)
{ {
pli_worker_->request_keyframe(ssrc, _srs_context->get_id());
}
srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId sub_cid)
{
srs_error_t err = srs_success;
uint32_t nn = 0; uint32_t nn = 0;
if (pli_epp->can_print(ssrc, &nn)) { if (pli_epp->can_print(ssrc, &nn)) {
// The player(subscriber) cid, which requires PLI. // The player(subscriber) cid, which requires PLI.
const SrsContextId& sub_cid = _srs_context->get_id();
srs_trace("RTC: Need PLI ssrc=%u, play=[%s], publish=[%s], count=%u/%u", ssrc, sub_cid.c_str(), srs_trace("RTC: Need PLI ssrc=%u, play=[%s], publish=[%s], count=%u/%u", ssrc, sub_cid.c_str(),
cid_.c_str(), nn, pli_epp->nn_count); cid_.c_str(), nn, pli_epp->nn_count);
} }
SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); if ((err = session_->send_rtcp_fb_pli(ssrc, sub_cid)) != srs_success) {
if (video_track) { srs_warn("PLI err %s", srs_error_desc(err).c_str());
video_track->request_keyframe(); srs_freep(err);
} }
session_->stat_->nn_pli++; session_->stat_->nn_pli++;
return err;
} }
void SrsRtcPublishStream::on_consumers_finished() void SrsRtcPublishStream::on_consumers_finished()

View file

@ -175,6 +175,37 @@ public:
virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext); virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext);
}; };
// The handler for PLI worker coroutine.
class ISrsRtcPLIWorkerHandler
{
public:
ISrsRtcPLIWorkerHandler();
virtual ~ISrsRtcPLIWorkerHandler();
public:
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid) = 0;
};
// A worker coroutine to request the PLI.
class ISrsRtcPLIWorker : virtual public ISrsCoroutineHandler
{
private:
SrsCoroutine* trd_;
srs_cond_t wait_;
ISrsRtcPLIWorkerHandler* handler_;
private:
// Key is SSRC, value is the CID of subscriber which requests PLI.
std::map<uint32_t, SrsContextId> plis_;
public:
ISrsRtcPLIWorker(ISrsRtcPLIWorkerHandler* h);
virtual ~ISrsRtcPLIWorker();
public:
virtual srs_error_t start();
virtual void request_keyframe(uint32_t ssrc, SrsContextId cid);
// interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
};
// A group of RTP packets for outgoing(send to players). // A group of RTP packets for outgoing(send to players).
class SrsRtcPlayStreamStatistic class SrsRtcPlayStreamStatistic
{ {
@ -208,12 +239,14 @@ public:
}; };
// A RTC play stream, client pull and play stream from SRS. // A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler, virtual public ISrsHourGlass class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler
{ {
private: private:
SrsContextId cid_; SrsContextId cid_;
SrsCoroutine* trd; SrsCoroutine* trd;
SrsRtcConnection* session_; SrsRtcConnection* session_;
ISrsRtcPLIWorker* pli_worker_;
private: private:
SrsRequest* req_; SrsRequest* req_;
SrsRtcStream* source_; SrsRtcStream* source_;
@ -266,15 +299,20 @@ private:
srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp); srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp);
srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp); srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp);
uint32_t get_video_publish_ssrc(uint32_t play_ssrc); uint32_t get_video_publish_ssrc(uint32_t play_ssrc);
// inteface ISrsRtcPLIWorkerHandler
public:
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
}; };
// A RTC publish stream, client push and publish stream to SRS. // A RTC publish stream, client push and publish stream to SRS.
class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublishStream class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler
, virtual public ISrsRtcPublishStream, virtual public ISrsRtcPLIWorkerHandler
{ {
private: private:
SrsContextId cid_; SrsContextId cid_;
SrsHourGlass* timer_; SrsHourGlass* timer_;
uint64_t nn_audio_frames; uint64_t nn_audio_frames;
ISrsRtcPLIWorker* pli_worker_;
private: private:
SrsRtcConnection* session_; SrsRtcConnection* session_;
uint16_t pt_to_drop_; uint16_t pt_to_drop_;
@ -326,6 +364,7 @@ private:
srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp); srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp);
public: public:
void request_keyframe(uint32_t ssrc); void request_keyframe(uint32_t ssrc);
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
void on_consumers_finished(); void on_consumers_finished();
// interface ISrsHourGlass // interface ISrsHourGlass
public: public:

View file

@ -1768,7 +1768,6 @@ srs_error_t SrsRtcAudioRecvTrack::check_send_nacks()
SrsRtcVideoRecvTrack::SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) SrsRtcVideoRecvTrack::SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc)
: SrsRtcRecvTrack(session, track_desc, false) : SrsRtcRecvTrack(session, track_desc, false)
{ {
request_key_frame_ = false;
} }
SrsRtcVideoRecvTrack::~SrsRtcVideoRecvTrack() SrsRtcVideoRecvTrack::~SrsRtcVideoRecvTrack()
@ -1792,17 +1791,6 @@ srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk
return srs_error_wrap(err, "source on rtp"); return srs_error_wrap(err, "source on rtp");
} }
// TODO: FIXME: add rtp process
if (request_key_frame_) {
// TODO: FIXME: add coroutine to request key frame.
request_key_frame_ = false;
if ((err = session_->send_rtcp_fb_pli(track_desc_->ssrc_, cid_of_subscriber_)) != srs_success) {
srs_warn("PLI err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}
// For NACK to handle packet. // For NACK to handle packet.
if ((err = on_nack(pkt)) != srs_success) { if ((err = on_nack(pkt)) != srs_success) {
return srs_error_wrap(err, "on nack"); return srs_error_wrap(err, "on nack");
@ -1821,7 +1809,7 @@ srs_error_t SrsRtcVideoRecvTrack::check_send_nacks()
} }
// If NACK timeout, start PLI if not requesting. // If NACK timeout, start PLI if not requesting.
if (timeout_nacks == 0 || request_key_frame_) { if (timeout_nacks == 0) {
return err; return err;
} }
@ -1831,13 +1819,6 @@ srs_error_t SrsRtcVideoRecvTrack::check_send_nacks()
return err; return err;
} }
void SrsRtcVideoRecvTrack::request_keyframe()
{
cid_of_subscriber_ = _srs_context->get_id();
request_key_frame_ = true;
}
SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio) SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio)
{ {
session_ = session; session_ = session;

View file

@ -518,18 +518,12 @@ public:
class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack
{ {
private:
bool request_key_frame_;
// The player(subscriber) cid, which requires PLI.
SrsContextId cid_of_subscriber_;
public: public:
SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs); SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs);
virtual ~SrsRtcVideoRecvTrack(); virtual ~SrsRtcVideoRecvTrack();
public: public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt);
virtual srs_error_t check_send_nacks(); virtual srs_error_t check_send_nacks();
public:
void request_keyframe();
}; };
class SrsRtcSendTrack class SrsRtcSendTrack