mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
SmartPtr: Support shared ptr for SRT source. (#4084)
--- Co-authored-by: Haibo Chen <495810242@qq.com>
This commit is contained in:
parent
6834ec208d
commit
7b9c52b283
9 changed files with 42 additions and 36 deletions
|
@ -148,7 +148,7 @@ class SrsSharedResource : public ISrsResource
|
|||
private:
|
||||
SrsSharedPtr<T> ptr_;
|
||||
public:
|
||||
SrsSharedResource(T* ptr) : ptr_(ptr) {
|
||||
SrsSharedResource(T* ptr = NULL) : ptr_(ptr) {
|
||||
}
|
||||
SrsSharedResource(const SrsSharedResource<T>& cp) : ptr_(cp.ptr_) {
|
||||
}
|
||||
|
|
|
@ -1216,11 +1216,11 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
|
|||
|
||||
// Check whether SRT stream is busy.
|
||||
#ifdef SRS_SRT
|
||||
SrsSrtSource* srt = NULL;
|
||||
bool srt_server_enabled = _srs_config->get_srt_enabled();
|
||||
bool srt_enabled = _srs_config->get_srt_enabled(r->vhost);
|
||||
if (srt_server_enabled && srt_enabled) {
|
||||
if ((err = _srs_srt_sources->fetch_or_create(r, &srt)) != srs_success) {
|
||||
SrsSharedPtr<SrsSrtSource> srt;
|
||||
if ((err = _srs_srt_sources->fetch_or_create(r, srt)) != srs_success) {
|
||||
return srs_error_wrap(err, "create source");
|
||||
}
|
||||
|
||||
|
|
|
@ -1102,11 +1102,11 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
|
|||
|
||||
// Check whether SRT stream is busy.
|
||||
#ifdef SRS_SRT
|
||||
SrsSrtSource* srt = NULL;
|
||||
bool srt_server_enabled = _srs_config->get_srt_enabled();
|
||||
bool srt_enabled = _srs_config->get_srt_enabled(req->vhost);
|
||||
if (srt_server_enabled && srt_enabled && !info->edge) {
|
||||
if ((err = _srs_srt_sources->fetch_or_create(req, &srt)) != srs_success) {
|
||||
SrsSharedPtr<SrsSrtSource> srt;
|
||||
if ((err = _srs_srt_sources->fetch_or_create(req, srt)) != srs_success) {
|
||||
return srs_error_wrap(err, "create source");
|
||||
}
|
||||
|
||||
|
|
|
@ -1853,6 +1853,7 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
|
|||
return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str());
|
||||
}
|
||||
|
||||
// See SrsSrtSource::on_consumer_destroy
|
||||
// TODO: FIXME: support source cleanup.
|
||||
// @see https://github.com/ossrs/srs/issues/713
|
||||
#if 0
|
||||
|
|
|
@ -152,7 +152,7 @@ srs_error_t SrsSrtRecvThread::get_recv_err()
|
|||
return srs_error_copy(recv_err_);
|
||||
}
|
||||
|
||||
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port)
|
||||
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port) : srt_source_(new SrsSrtSource())
|
||||
{
|
||||
// Create a identify for this client.
|
||||
_srs_context->set_id(_srs_context->generate_id());
|
||||
|
@ -171,7 +171,6 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s
|
|||
|
||||
trd_ = new SrsSTCoroutine("ts-srt", this, _srs_context->get_id());
|
||||
|
||||
srt_source_ = NULL;
|
||||
req_ = new SrsRequest();
|
||||
req_->ip = ip;
|
||||
|
||||
|
@ -285,7 +284,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
|
|||
srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s",
|
||||
streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str());
|
||||
|
||||
if ((err = _srs_srt_sources->fetch_or_create(req_, &srt_source_)) != srs_success) {
|
||||
if ((err = _srs_srt_sources->fetch_or_create(req_, srt_source_)) != srs_success) {
|
||||
return srs_error_wrap(err, "fetch srt source");
|
||||
}
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ private:
|
|||
SrsCoroutine* trd_;
|
||||
|
||||
SrsRequest* req_;
|
||||
SrsSrtSource* srt_source_;
|
||||
SrsSharedPtr<SrsSrtSource> srt_source_;
|
||||
SrsSecurity* security_;
|
||||
};
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ SrsSrtSourceManager::~SrsSrtSourceManager()
|
|||
srs_mutex_destroy(lock);
|
||||
}
|
||||
|
||||
srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** pps)
|
||||
srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
@ -110,48 +110,44 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** p
|
|||
// @bug https://github.com/ossrs/srs/issues/1230
|
||||
SrsLocker(lock);
|
||||
|
||||
SrsSrtSource* source = NULL;
|
||||
if ((source = fetch(r)) != NULL) {
|
||||
string stream_url = r->get_stream_url();
|
||||
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
|
||||
if (it != pool.end()) {
|
||||
SrsSharedPtr<SrsSrtSource> source = it->second;
|
||||
|
||||
// we always update the request of resource,
|
||||
// for origin auth is on, the token in request maybe invalid,
|
||||
// and we only need to update the token of request, it's simple.
|
||||
source->update_auth(r);
|
||||
*pps = source;
|
||||
pps = source;
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
string stream_url = r->get_stream_url();
|
||||
string vhost = r->vhost;
|
||||
|
||||
// should always not exists for create a source.
|
||||
srs_assert (pool.find(stream_url) == pool.end());
|
||||
|
||||
SrsSharedPtr<SrsSrtSource> source(new SrsSrtSource());
|
||||
srs_trace("new srt source, stream_url=%s", stream_url.c_str());
|
||||
|
||||
source = new SrsSrtSource();
|
||||
if ((err = source->initialize(r)) != srs_success) {
|
||||
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
|
||||
}
|
||||
|
||||
pool[stream_url] = source;
|
||||
|
||||
*pps = source;
|
||||
pps = source;
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
SrsSrtSource* SrsSrtSourceManager::fetch(SrsRequest* r)
|
||||
void SrsSrtSourceManager::eliminate(SrsRequest* r)
|
||||
{
|
||||
SrsSrtSource* source = NULL;
|
||||
// Use lock to protect coroutine switch.
|
||||
// @bug https://github.com/ossrs/srs/issues/1230
|
||||
SrsLocker(lock);
|
||||
|
||||
string stream_url = r->get_stream_url();
|
||||
if (pool.find(stream_url) == pool.end()) {
|
||||
return NULL;
|
||||
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
|
||||
if (it != pool.end()) {
|
||||
pool.erase(it);
|
||||
}
|
||||
|
||||
source = pool[stream_url];
|
||||
|
||||
return source;
|
||||
}
|
||||
|
||||
SrsSrtSourceManager* _srs_srt_sources = NULL;
|
||||
|
@ -973,6 +969,11 @@ void SrsSrtSource::on_consumer_destroy(SrsSrtConsumer* consumer)
|
|||
if (it != consumers.end()) {
|
||||
it = consumers.erase(it);
|
||||
}
|
||||
|
||||
// Destroy and cleanup source when no publishers and consumers.
|
||||
if (can_publish_ && consumers.empty()) {
|
||||
_srs_srt_sources->eliminate(req);
|
||||
}
|
||||
}
|
||||
|
||||
bool SrsSrtSource::can_publish()
|
||||
|
@ -1026,6 +1027,11 @@ void SrsSrtSource::on_unpublish()
|
|||
bridge_->on_unpublish();
|
||||
srs_freep(bridge_);
|
||||
}
|
||||
|
||||
// Destroy and cleanup source when no publishers and consumers.
|
||||
if (can_publish_ && consumers.empty()) {
|
||||
_srs_srt_sources->eliminate(req);
|
||||
}
|
||||
}
|
||||
|
||||
srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <srs_kernel_ts.hpp>
|
||||
#include <srs_protocol_st.hpp>
|
||||
#include <srs_app_stream_bridge.hpp>
|
||||
#include <srs_core_autofree.hpp>
|
||||
|
||||
class SrsSharedPtrMessage;
|
||||
class SrsRequest;
|
||||
|
@ -50,7 +51,7 @@ class SrsSrtSourceManager
|
|||
{
|
||||
private:
|
||||
srs_mutex_t lock;
|
||||
std::map<std::string, SrsSrtSource*> pool;
|
||||
std::map< std::string, SrsSharedPtr<SrsSrtSource> > pool;
|
||||
public:
|
||||
SrsSrtSourceManager();
|
||||
virtual ~SrsSrtSourceManager();
|
||||
|
@ -58,10 +59,9 @@ public:
|
|||
// create source when fetch from cache failed.
|
||||
// @param r the client request.
|
||||
// @param pps the matched source, if success never be NULL.
|
||||
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSrtSource** pps);
|
||||
public:
|
||||
// Get the exists source, NULL when not exists.
|
||||
virtual SrsSrtSource* fetch(SrsRequest* r);
|
||||
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps);
|
||||
// Dispose and destroy the source.
|
||||
virtual void eliminate(SrsRequest* r);
|
||||
};
|
||||
|
||||
// Global singleton instance.
|
||||
|
|
|
@ -98,7 +98,7 @@ private:
|
|||
uint32_t* ref_count_;
|
||||
public:
|
||||
// Create a shared ptr with the object.
|
||||
SrsSharedPtr(T* ptr) {
|
||||
SrsSharedPtr(T* ptr = NULL) {
|
||||
ptr_ = ptr;
|
||||
ref_count_ = new uint32_t(1);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue