1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

SquashSRS4: Support WebRTC re-publish stream.

This commit is contained in:
winlin 2021-03-26 14:59:25 +08:00
parent aab54b2cf4
commit 4692e8b8ad
6 changed files with 95 additions and 20 deletions

View file

@ -195,6 +195,7 @@ Other documents:
## V4 changes ## V4 changes
* v4.0, 2021-03-24, RTC: Support WebRTC re-publish stream. 4.0.87
* v4.0, 2021-03-24, RTC: Use fast parse TWCCID, ignore in packet parsing. 4.0.86 * v4.0, 2021-03-24, RTC: Use fast parse TWCCID, ignore in packet parsing. 4.0.86
* v4.0, 2021-03-09, DTLS: Fix ARQ bug, use openssl timeout. 4.0.84 * v4.0, 2021-03-09, DTLS: Fix ARQ bug, use openssl timeout. 4.0.84
* v4.0, 2021-03-08, DTLS: Fix dead loop by duplicated Alert message. 4.0.83 * v4.0, 2021-03-08, DTLS: Fix dead loop by duplicated Alert message. 4.0.83

View file

@ -466,6 +466,30 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, Srs
return err; return err;
} }
void SrsRtcPlayStream::on_stream_change(SrsRtcStreamDescription* desc)
{
// Refresh the relation for audio.
// TODO: FIMXE: Match by label?
if (desc->audio_track_desc_ && audio_tracks_.size() == 1) {
uint32_t ssrc = desc->audio_track_desc_->ssrc_;
SrsRtcAudioSendTrack* track = audio_tracks_.begin()->second;
audio_tracks_.clear();
audio_tracks_.insert(make_pair(ssrc, track));
}
// Refresh the relation for video.
// TODO: FIMXE: Match by label?
if (desc->video_track_descs_.size() == 1 && desc->video_track_descs_.size() == 1) {
SrsRtcTrackDescription* vdesc = desc->video_track_descs_.at(0);
uint32_t ssrc = vdesc->ssrc_;
SrsRtcVideoSendTrack* track = video_tracks_.begin()->second;
video_tracks_.clear();
video_tracks_.insert(make_pair(ssrc, track));
}
}
srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost) srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost)
{ {
if (req_->vhost != vhost) { if (req_->vhost != vhost) {
@ -546,6 +570,9 @@ srs_error_t SrsRtcPlayStream::cycle()
return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str()); return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());
} }
srs_assert(consumer);
consumer->set_handler(this);
// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames. // TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
if ((err = source->consumer_dumps(consumer)) != srs_success) { if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str()); return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());

View file

@ -211,7 +211,7 @@ public:
// A RTC play stream, client pull and play stream from SRS. // A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler , virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
{ {
private: private:
SrsContextId cid_; SrsContextId cid_;
@ -235,13 +235,16 @@ private:
bool nack_enabled_; bool nack_enabled_;
bool nack_no_copy_; bool nack_no_copy_;
private: private:
// Whether palyer started. // Whether player started.
bool is_started; bool is_started;
public: public:
SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid); SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid);
virtual ~SrsRtcPlayStream(); virtual ~SrsRtcPlayStream();
public: public:
srs_error_t initialize(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations); srs_error_t initialize(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations);
// Interface ISrsRtcStreamChangeCallback
public:
void on_stream_change(SrsRtcStreamDescription* desc);
// interface ISrsReloadHandler // interface ISrsReloadHandler
public: public:
virtual srs_error_t on_reload_vhost_play(std::string vhost); virtual srs_error_t on_reload_vhost_play(std::string vhost);
@ -268,7 +271,7 @@ private:
srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp); srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp);
srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp); srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp);
uint32_t get_video_publish_ssrc(uint32_t play_ssrc); uint32_t get_video_publish_ssrc(uint32_t play_ssrc);
// inteface ISrsRtcPLIWorkerHandler // Interface ISrsRtcPLIWorkerHandler
public: public:
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
}; };

View file

@ -152,10 +152,19 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
return srs_ntp; return srs_ntp;
} }
ISrsRtcStreamChangeCallback::ISrsRtcStreamChangeCallback()
{
}
ISrsRtcStreamChangeCallback::~ISrsRtcStreamChangeCallback()
{
}
SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s) SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s)
{ {
source = s; source = s;
should_update_source_id = false; should_update_source_id = false;
handler_ = NULL;
mw_wait = srs_cond_new(); mw_wait = srs_cond_new();
mw_min_msgs = 0; mw_min_msgs = 0;
@ -231,6 +240,13 @@ void SrsRtcConsumer::wait(int nb_msgs)
srs_cond_wait(mw_wait); srs_cond_wait(mw_wait);
} }
void SrsRtcConsumer::on_stream_change(SrsRtcStreamDescription* desc)
{
if (handler_) {
handler_->on_stream_change(desc);
}
}
SrsRtcStreamManager::SrsRtcStreamManager() SrsRtcStreamManager::SrsRtcStreamManager()
{ {
lock = NULL; lock = NULL;
@ -354,26 +370,36 @@ void SrsRtcStream::update_auth(SrsRequest* r)
req->update_auth(r); req->update_auth(r);
} }
srs_error_t SrsRtcStream::on_source_id_changed(SrsContextId id) srs_error_t SrsRtcStream::on_source_changed()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if (!_source_id.compare(id)) { // Update context id if changed.
return err; bool id_changed = false;
} const SrsContextId& id = _srs_context->get_id();
if (_source_id.compare(id)) {
id_changed = true;
if (_pre_source_id.empty()) { if (_pre_source_id.empty()) {
_pre_source_id = id; _pre_source_id = id;
} }
_source_id = id; _source_id = id;
}
// notice all consumer // Notify all consumers.
std::vector<SrsRtcConsumer*>::iterator it; std::vector<SrsRtcConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) { for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsRtcConsumer* consumer = *it; SrsRtcConsumer* consumer = *it;
// Notify if context id changed.
if (id_changed) {
consumer->update_source_id(); consumer->update_source_id();
} }
// Notify about stream description.
consumer->on_stream_change(stream_desc_);
}
return err; return err;
} }
@ -456,9 +482,8 @@ srs_error_t SrsRtcStream::on_publish()
is_created_ = true; is_created_ = true;
is_delivering_packets_ = true; is_delivering_packets_ = true;
// whatever, the publish thread is the source or edge source, // Notify the consumers about stream change event.
// save its id to srouce id. if ((err = on_source_changed()) != srs_success) {
if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "source id change"); return srs_error_wrap(err, "source id change");
} }

View file

@ -75,6 +75,17 @@ public:
static uint64_t kMagicNtpFractionalUnit; static uint64_t kMagicNtpFractionalUnit;
}; };
// When RTC stream publish and re-publish.
class ISrsRtcStreamChangeCallback
{
public:
ISrsRtcStreamChangeCallback();
virtual ~ISrsRtcStreamChangeCallback();
public:
virtual void on_stream_change(SrsRtcStreamDescription* desc) = 0;
};
// The RTC stream consumer, consume packets from RTC stream source.
class SrsRtcConsumer class SrsRtcConsumer
{ {
private: private:
@ -87,6 +98,9 @@ private:
srs_cond_t mw_wait; srs_cond_t mw_wait;
bool mw_waiting; bool mw_waiting;
int mw_min_msgs; int mw_min_msgs;
private:
// The callback for stream change event.
ISrsRtcStreamChangeCallback* handler_;
public: public:
SrsRtcConsumer(SrsRtcStream* s); SrsRtcConsumer(SrsRtcStream* s);
virtual ~SrsRtcConsumer(); virtual ~SrsRtcConsumer();
@ -100,6 +114,9 @@ public:
virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt); virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt);
// Wait for at-least some messages incoming in queue. // Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs); virtual void wait(int nb_msgs);
public:
void set_handler(ISrsRtcStreamChangeCallback* h) { handler_ = h; } // SrsRtcConsumer::set_handler()
void on_stream_change(SrsRtcStreamDescription* desc);
}; };
class SrsRtcStreamManager class SrsRtcStreamManager
@ -154,7 +171,7 @@ private:
// For publish, it's the publish client id. // For publish, it's the publish client id.
// For edge, it's the edge ingest id. // For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect, // when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know. // invoke the on_source_changed() to let all clients know.
SrsContextId _source_id; SrsContextId _source_id;
// previous source id. // previous source id.
SrsContextId _pre_source_id; SrsContextId _pre_source_id;
@ -180,8 +197,10 @@ public:
virtual srs_error_t initialize(SrsRequest* r); virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request. // Update the authentication information in request.
virtual void update_auth(SrsRequest* r); virtual void update_auth(SrsRequest* r);
// The source id changed. private:
virtual srs_error_t on_source_id_changed(SrsContextId id); // The stream source changed.
virtual srs_error_t on_source_changed();
public:
// Get current source id. // Get current source id.
virtual SrsContextId source_id(); virtual SrsContextId source_id();
virtual SrsContextId pre_source_id(); virtual SrsContextId pre_source_id();

View file

@ -26,6 +26,6 @@
#define VERSION_MAJOR 4 #define VERSION_MAJOR 4
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 86 #define VERSION_REVISION 87
#endif #endif