1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00

RTC: Support more than one publishers or players.

This commit is contained in:
winlin 2020-08-11 11:06:17 +08:00
parent 571f417db4
commit f54bf8d9be
4 changed files with 281 additions and 48 deletions

View file

@ -1406,14 +1406,11 @@ ISrsRtcConnectionHijacker::~ISrsRtcConnectionHijacker()
SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id)
{
req = NULL;
is_publisher_ = false;
cid = context_id;
stat_ = new SrsRtcConnectionStatistic();
timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS);
hijacker_ = NULL;
publisher_ = NULL;
player_ = NULL;
sendonly_skt = NULL;
server_ = s;
transport_ = new SrsSecurityTransport(this);
@ -1431,11 +1428,10 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id)
SrsRtcConnection::~SrsRtcConnection()
{
srs_freep(timer_);
srs_freep(player_);
srs_freep(publisher_);
srs_freep(transport_);
srs_freep(req);
srs_freep(stat_);
srs_freep(pp_address_change);
// Note that we should never delete the sendonly_skt,
// it's just point to the object in peer_addresses_.
@ -1445,7 +1441,21 @@ SrsRtcConnection::~SrsRtcConnection()
srs_freep(addr);
}
srs_freep(pp_address_change);
// Cleanup publishers.
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
SrsRtcPublishStream* publisher = it->second;
srs_freep(publisher);
}
publishers_.clear();
publishers_ssrc_map_.clear();
// Cleanup players.
for(map<string, SrsRtcPlayStream*>::iterator it = players_.begin(); it != players_.end(); ++it) {
SrsRtcPlayStream* player = it->second;
srs_freep(player);
}
players_.clear();
players_ssrc_map_.clear();
}
SrsSdp* SrsRtcConnection::get_local_sdp()
@ -1615,12 +1625,11 @@ srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp)
return err;
}
srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool is_publisher, bool dtls, bool srtp, string username)
srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, string username)
{
srs_error_t err = srs_success;
username_ = username;
is_publisher_ = is_publisher;
req = r->copy();
if (!srtp) {
@ -1725,12 +1734,88 @@ srs_error_t SrsRtcConnection::dispatch_rtcp(SrsRtcpCommon* rtcp)
{
srs_error_t err = srs_success;
if (player_) {
return player_->on_rtcp(rtcp);
}
// TODO: FIXME: Refine the logic code, simple it.
map<uint32_t, SrsRtcPublishStream*>::iterator it_pub;
map<uint32_t, SrsRtcPlayStream*>::iterator it_player;
SrsRtcPlayStream* player = NULL;
SrsRtcPublishStream* publisher = NULL;
if (publisher_) {
return publisher_->on_rtcp(rtcp);
if(SrsRtcpType_sr == rtcp->type()) {
it_pub = publishers_ssrc_map_.find(rtcp->get_ssrc());
if(publishers_ssrc_map_.end() == it_pub) {
srs_warn("SR: ssrc %d is unknown", rtcp->get_ssrc());
return err;
}
publisher = it_pub->second;
if(srs_success != (err = publisher->on_rtcp(rtcp))) {
return srs_error_wrap(err, "handle sr");
}
} else if(SrsRtcpType_rr == rtcp->type()) {
SrsRtcpRR* rr = dynamic_cast<SrsRtcpRR*>(rtcp);
srs_assert(NULL != rr);
if (rr->get_rb_ssrc() == 0) { //for native client
return err;
}
it_player = players_ssrc_map_.find(rr->get_rb_ssrc());
if(players_ssrc_map_.end() == it_player) {
srs_warn("RR: ssrc %d is unknown", rr->get_rb_ssrc());
return err;
}
player = it_player->second;
if(srs_success != (err = player->on_rtcp(rtcp))) {
return srs_error_wrap(err, "handle rr");
}
} else if(SrsRtcpType_rtpfb == rtcp->type()) {
if(1 == rtcp->get_rc()) {
//nack
SrsRtcpNack* nack = dynamic_cast<SrsRtcpNack*>(rtcp);
srs_assert(NULL != nack);
it_player = players_ssrc_map_.find(nack->get_media_ssrc());
if(players_ssrc_map_.end() == it_player) {
srs_warn("ftpfb: ssrc %d is unknown", nack->get_media_ssrc());
return err;
}
player = it_player->second;
if(srs_success != (err = player->on_rtcp(rtcp))) {
return srs_error_wrap(err, "handle nack");
}
} else if(15 == rtcp->get_rc()) {
// twcc
if(srs_success != (err = on_rtcp_feedback(rtcp->data(), rtcp->size()))) {
return srs_error_wrap(err, "handle twcc feedback");
}
}
} else if(SrsRtcpType_psfb == rtcp->type()) {
SrsRtcpPsfbCommon* psfb = dynamic_cast<SrsRtcpPsfbCommon*>(rtcp);
srs_assert(psfb != NULL);
it_player = players_ssrc_map_.find(psfb->get_media_ssrc());
if(players_ssrc_map_.end() == it_player) {
srs_warn("psfb: ssrc %d is unknown", psfb->get_media_ssrc());
return err;
}
player = it_player->second;
if(srs_success != (err = player->on_rtcp(rtcp))) {
return srs_error_wrap(err, "handle nack");
}
} else {
// try to find player to assign rtcp
it_player = players_ssrc_map_.find(rtcp->get_ssrc());
if(players_ssrc_map_.end() != it_player) {
player = it_player->second;
if(srs_success != (err = player->on_rtcp(rtcp))) {
return srs_error_wrap(err, "handle common rtcp");
}
return err;
}
// try to find publisher to assign rtcp
it_pub = publishers_ssrc_map_.find(rtcp->get_ssrc());
if(publishers_ssrc_map_.end() != it_pub) {
publisher = it_pub->second;
if(srs_success != (err = publisher->on_rtcp(rtcp))) {
return srs_error_wrap(err, "handle sr");
}
}
}
return err;
@ -1748,27 +1833,58 @@ void SrsRtcConnection::set_hijacker(ISrsRtcConnectionHijacker* h)
srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{
if (publisher_ == NULL) {
srs_error_t err = srs_success;
if (publishers_.size() == 0) {
return srs_error_new(ERROR_RTC_RTCP, "no publisher");
}
//TODO: FIXME: add unprotect_rtcp.
return publisher_->on_rtp(data, nb_data);
SrsRtpHeader header;
if (true) {
SrsBuffer* buffer = new SrsBuffer(data, nb_data);
SrsAutoFree(SrsBuffer, buffer);
if(srs_success != (err = header.decode(buffer))) {
return srs_error_wrap(err, "decode rtp header");
}
}
map<uint32_t, SrsRtcPublishStream*>::iterator it = publishers_ssrc_map_.find(header.get_ssrc());
if(it == publishers_ssrc_map_.end()) {
return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", header.get_ssrc());
}
SrsRtcPublishStream* publisher = it->second;
return publisher->on_rtp(data, nb_data);
}
srs_error_t SrsRtcConnection::on_connection_established()
{
srs_error_t err = srs_success;
srs_trace("RTC: session %s, to=%dms connection established", (is_publisher_? "Publisher":"Subscriber"),
srs_trace("RTC: session pub=%u, sub=%u, to=%dms connection established", publishers_.size(), players_.size(),
srsu2msi(session_timeout));
if (is_publisher_) {
if ((err = start_publish()) != srs_success) {
// start all publisher
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
string url = it->first;
SrsRtcPublishStream* publisher = it->second;
srs_trace("RTC: Publisher url=%s established", url.c_str());
if ((err = publisher->start()) != srs_success) {
return srs_error_wrap(err, "start publish");
}
} else {
if ((err = start_play()) != srs_success) {
}
// start all player
for(map<string, SrsRtcPlayStream*>::iterator it = players_.begin(); it != players_.end(); ++it) {
string url = it->first;
SrsRtcPlayStream* player = it->second;
srs_trace("RTC: Subscriber url=%s established", url.c_str());
if ((err = player->start()) != srs_success) {
return srs_error_wrap(err, "start play");
}
}
@ -1782,22 +1898,33 @@ srs_error_t SrsRtcConnection::on_connection_established()
return err;
}
srs_error_t SrsRtcConnection::start_play()
srs_error_t SrsRtcConnection::start_play(string stream_uri)
{
srs_error_t err = srs_success;
if ((err = player_->start()) != srs_success) {
map<string, SrsRtcPlayStream*>::iterator it = players_.find(stream_uri);
if(it == players_.end()) {
return srs_error_new(ERROR_RTC_NO_PLAYER, "not subscribe %s", stream_uri.c_str());
}
SrsRtcPlayStream* player = it->second;
if ((err = player->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
return err;
}
srs_error_t SrsRtcConnection::start_publish()
srs_error_t SrsRtcConnection::start_publish(std::string stream_uri)
{
srs_error_t err = srs_success;
if ((err = publisher_->start()) != srs_success) {
map<string, SrsRtcPublishStream*>::iterator it = publishers_.find(stream_uri);
if(it == publishers_.end()) {
return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no %s publisher", stream_uri.c_str());
}
if ((err = it->second->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
@ -2044,8 +2171,9 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc)
void SrsRtcConnection::simulate_nack_drop(int nn)
{
if (publisher_) {
publisher_->simulate_nack_drop(nn);
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
SrsRtcPublishStream* publisher = it->second;
publisher->simulate_nack_drop(nn);
}
nn_simulate_player_nack_drop = nn;
@ -2118,15 +2246,28 @@ srs_error_t SrsRtcConnection::do_send_packets(const std::vector<SrsRtpPacket2*>&
return err;
}
void SrsRtcConnection::set_all_tracks_status(bool status)
void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_publish, bool status)
{
if (player_) {
player_->set_all_tracks_status(status);
// For publishers.
if (is_publish) {
map<string, SrsRtcPublishStream*>::iterator it = publishers_.find(stream_uri);
if (publishers_.end() == it) {
return;
}
SrsRtcPublishStream* publisher = it->second;
publisher->set_all_tracks_status(status);
return;
}
if (publisher_) {
publisher_->set_all_tracks_status(status);
// For players.
map<string, SrsRtcPlayStream*>::iterator it = players_.find(stream_uri);
if (players_.end() == it) {
return;
}
SrsRtcPlayStream* player = it->second;
player->set_all_tracks_status(status);
}
#ifdef SRS_OSX
@ -2813,14 +2954,43 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::map<uint32_t,
{
srs_error_t err = srs_success;
if (player_) {
// Ignore if exists.
if(players_.end() != players_.find(req->get_stream_url())) {
return err;
}
player_ = new SrsRtcPlayStream(this, _srs_context->get_id());
if ((err = player_->initialize(req, sub_relations)) != srs_success) {
SrsRtcPlayStream* player = new SrsRtcPlayStream(this, _srs_context->get_id());
if ((err = player->initialize(req, sub_relations)) != srs_success) {
srs_freep(player);
return srs_error_wrap(err, "SrsRtcPlayStream init");
}
players_.insert(make_pair(req->get_stream_url(), player));
// make map between ssrc and player for fastly searching
for(map<uint32_t, SrsRtcTrackDescription*>::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) {
SrsRtcTrackDescription* track_desc = it->second;
if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate ssrc %d, track id: %s",
track_desc->ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->ssrc_] = player;
if(0 != track_desc->fec_ssrc_) {
if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate fec ssrc %d, track id: %s",
track_desc->fec_ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->fec_ssrc_] = player;
}
if(0 != track_desc->rtx_ssrc_) {
if(players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate rtx ssrc %d, track id: %s",
track_desc->rtx_ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->rtx_ssrc_] = player;
}
}
// TODO: FIXME: Support reload.
// The TWCC ID is the ext-map ID in local SDP, and we set to enable GCC.
@ -2849,14 +3019,68 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDesc
return srs_error_new(ERROR_RTC_STREAM_DESC, "rtc publisher init");
}
if (publisher_) {
// Ignore if exists.
if(publishers_.end() != publishers_.find(req->get_stream_url())) {
return err;
}
publisher_ = new SrsRtcPublishStream(this);
if ((err = publisher_->initialize(req, stream_desc)) != srs_success) {
SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this);
if ((err = publisher->initialize(req, stream_desc)) != srs_success) {
srs_freep(publisher);
return srs_error_wrap(err, "rtc publisher init");
}
publishers_[req->get_stream_url()] = publisher;
if(NULL != stream_desc->audio_track_desc_) {
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",
stream_desc->audio_track_desc_->ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->ssrc_] = publisher;
if(0 != stream_desc->audio_track_desc_->fec_ssrc_
&& stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->fec_ssrc_) {
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s",
stream_desc->audio_track_desc_->fec_ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->fec_ssrc_] = publisher;
}
if(0 != stream_desc->audio_track_desc_->rtx_ssrc_
&& stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->rtx_ssrc_) {
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s",
stream_desc->audio_track_desc_->rtx_ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->rtx_ssrc_] = publisher;
}
}
for(int i = 0; i < stream_desc->video_track_descs_.size(); ++i) {
SrsRtcTrackDescription* track_desc = stream_desc->video_track_descs_.at(i);
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",
track_desc->ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->ssrc_] = publisher;
if(0 != track_desc->fec_ssrc_ && track_desc->ssrc_ != track_desc->fec_ssrc_) {
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s",
track_desc->fec_ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->fec_ssrc_] = publisher;
}
if(0 != track_desc->rtx_ssrc_ && track_desc->rtx_ssrc_ != track_desc->fec_ssrc_) {
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s",
track_desc->rtx_ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->rtx_ssrc_] = publisher;
}
}
return err;
}

View file

@ -378,10 +378,16 @@ private:
SrsRtcServer* server_;
SrsRtcConnectionStateType state_;
ISrsRtcTransport* transport_;
SrsRtcPlayStream* player_;
SrsRtcPublishStream* publisher_;
bool is_publisher_;
SrsHourGlass* timer_;
private:
// key: stream id
std::map<std::string, SrsRtcPlayStream*> players_;
//key: player track's ssrc
std::map<uint32_t, SrsRtcPlayStream*> players_ssrc_map_;
// key: stream id
std::map<std::string, SrsRtcPublishStream*> publishers_;
// key: publisher track's ssrc
std::map<uint32_t, SrsRtcPublishStream*> publishers_ssrc_map_;
private:
// The local:remote username, such as m5x0n128:jvOm where local name is m5x0n128.
std::string username_;
@ -433,7 +439,7 @@ public:
srs_error_t add_player2(SrsRequest* request, SrsSdp& local_sdp);
public:
// Before initialize, user must set the local SDP, which is used to inititlize DTLS.
srs_error_t initialize(SrsRequest* r, bool is_publisher, bool dtls, bool srtp, std::string username);
srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username);
// The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r);
srs_error_t on_dtls(char* data, int nb_data);
@ -446,8 +452,8 @@ public:
void set_hijacker(ISrsRtcConnectionHijacker* h);
public:
srs_error_t on_connection_established();
srs_error_t start_play();
srs_error_t start_publish();
srs_error_t start_play(std::string stream_uri);
srs_error_t start_publish(std::string stream_uri);
bool is_stun_timeout();
void update_sendonly_socket(SrsUdpMuxSocket* skt);
// interface ISrsHourGlass
@ -466,7 +472,7 @@ public:
void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes);
srs_error_t do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info);
// Directly set the status of play track, generally for init to set the default value.
void set_all_tracks_status(bool status);
void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status);
private:
srs_error_t on_binding_request(SrsStunPacket* r);
// publish media capabilitiy negotiate

View file

@ -453,7 +453,7 @@ srs_error_t SrsRtcServer::do_create_session(
}
// All tracks default as inactive, so we must enable them.
session->set_all_tracks_status(true);
session->set_all_tracks_status(req->get_stream_url(), publish, true);
std::string local_pwd = srs_random_str(32);
std::string local_ufrag = "";
@ -504,7 +504,7 @@ srs_error_t SrsRtcServer::do_create_session(
session->set_state(WAITING_STUN);
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(req, publish, dtls, srtp, username)) != srs_success) {
if ((err = session->initialize(req, dtls, srtp, username)) != srs_success) {
return srs_error_wrap(err, "init");
}
@ -567,7 +567,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest*
// TODO: FIXME: Collision detect.
string username = session->get_local_sdp()->get_ice_ufrag() + ":" + remote_sdp.get_ice_ufrag();
if ((err = session->initialize(req, false, true, true, username)) != srs_success) {
if ((err = session->initialize(req, true, true, username)) != srs_success) {
return srs_error_wrap(err, "init");
}

View file

@ -357,6 +357,9 @@
#define ERROR_RTC_STREAM_DESC 5026
#define ERROR_RTC_TRACK_CODEC 5027
#define ERROR_RTC_NO_PLAYER 5028
#define ERROR_RTC_NO_PUBLISHER 5029
#define ERROR_RTC_DUPLICATED_SSRC 5030
#define ERROR_RTC_NO_TRACK 5031
///////////////////////////////////////////////////////
// GB28181 API error.