1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SmartPtr: Support shared ptr for live source. v6.0.129 (#4089)

Detail change log:

1. [Simple,Refactor] Remove member fields of http entry, etc.
e34b3d3aa4
2. [Ignore] Rename source to live_source.
846f95ec96
3. [Ignore] Use directly ptr in consumer.
d38af021ad
4. [Complex, Important] Use shared ptr for live source.
88f922413a

The object relationship:

![live-source](1adb59af-6e7a-40f3-9a4a-1cc849d7dae1)

---

Co-authored-by: Jacob Su <suzp1984@gmail.com>
This commit is contained in:
Winlin 2024-06-15 07:54:56 +08:00 committed by GitHub
parent 908c2f2a30
commit e7069788e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 256 additions and 235 deletions

View file

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v6-changes"></a>
## SRS 6.0 Changelog
* v6.0, 2024-06-15, Merge [#4089](https://github.com/ossrs/srs/pull/4089): SmartPtr: Support shared ptr for live source. v6.0.129 (#4089)
* v6.0, 2024-06-14, Merge [#4085](https://github.com/ossrs/srs/pull/4085): SmartPtr: Support shared ptr for RTC source. v6.0.128 (#4085)
* v6.0, 2024-06-13, Merge [#4083](https://github.com/ossrs/srs/pull/4083): SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (#4083)
* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080)

View file

@ -122,7 +122,7 @@ SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stre
return it->second;
}
srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsCoWorkers::on_publish(SrsRequest* r)
{
srs_error_t err = srs_success;
@ -140,7 +140,7 @@ srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
return err;
}
void SrsCoWorkers::on_unpublish(SrsLiveSource* s, SrsRequest* r)
void SrsCoWorkers::on_unpublish(SrsRequest* r)
{
string url = r->get_stream_url();

View file

@ -33,8 +33,8 @@ public:
private:
virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream);
public:
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t on_publish(SrsRequest* r);
virtual void on_unpublish(SrsRequest* r);
};
#endif

View file

@ -391,7 +391,7 @@ void SrsEdgeFlvUpstream::kbps_sample(const char* label, srs_utime_t age)
SrsEdgeIngester::SrsEdgeIngester()
{
source = NULL;
source_ = NULL;
edge = NULL;
req = NULL;
#ifdef SRS_APM
@ -415,9 +415,11 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(trd);
}
srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r)
srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge* e, SrsRequest* r)
{
source = s;
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();
edge = e;
req = r;
@ -435,7 +437,7 @@ srs_error_t SrsEdgeIngester::start()
{
srs_error_t err = srs_success;
if ((err = source->on_publish()) != srs_success) {
if ((err = source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "notify source");
}
@ -455,8 +457,8 @@ void SrsEdgeIngester::stop()
upstream->close();
// Notify source to un-publish if exists.
if (source) {
source->on_unpublish();
if (source_) {
source_->on_unpublish();
}
}
@ -549,7 +551,7 @@ srs_error_t SrsEdgeIngester::do_cycle()
upstream = new SrsEdgeRtmpUpstream(redirect);
}
if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
if ((err = source_->on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "on source id changed");
}
@ -635,21 +637,21 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri
// process audio packet
if (msg->header.is_audio()) {
if ((err = source->on_audio(msg)) != srs_success) {
if ((err = source_->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "source consume audio");
}
}
// process video packet
if (msg->header.is_video()) {
if ((err = source->on_video(msg)) != srs_success) {
if ((err = source_->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "source consume video");
}
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((err = source->on_aggregate(msg)) != srs_success) {
if ((err = source_->on_aggregate(msg)) != srs_success) {
return srs_error_wrap(err, "source consume aggregate");
}
return err;
@ -665,7 +667,7 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
if ((err = source_->on_meta_data(msg, metadata)) != srs_success) {
return srs_error_wrap(err, "source consume metadata");
}
return err;
@ -725,7 +727,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
edge = NULL;
req = NULL;
send_error_code = ERROR_SUCCESS;
source = NULL;
source_ = NULL;
sdk = NULL;
lb = new SrsLbRoundRobin();
@ -747,9 +749,11 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
return queue->set_queue_size(queue_size);
}
srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r)
srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge* e, SrsRequest* r)
{
source = s;
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();
edge = e;
req = r;
@ -956,7 +960,7 @@ SrsPlayEdge::~SrsPlayEdge()
srs_freep(ingester);
}
srs_error_t SrsPlayEdge::initialize(SrsLiveSource* source, SrsRequest* req)
srs_error_t SrsPlayEdge::initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req)
{
srs_error_t err = srs_success;
@ -1048,7 +1052,7 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
return forwarder->set_queue_size(queue_size);
}
srs_error_t SrsPublishEdge::initialize(SrsLiveSource* source, SrsRequest* req)
srs_error_t SrsPublishEdge::initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req)
{
srs_error_t err = srs_success;

View file

@ -10,6 +10,7 @@
#include <srs_core.hpp>
#include <srs_app_st.hpp>
#include <srs_core_autofree.hpp>
#include <string>
@ -137,7 +138,9 @@ public:
class SrsEdgeIngester : public ISrsCoroutineHandler
{
private:
SrsLiveSource* source;
// Because source references to this object, so we should directly use the source ptr.
SrsLiveSource* source_;
private:
SrsPlayEdge* edge;
SrsRequest* req;
SrsCoroutine* trd;
@ -150,7 +153,7 @@ public:
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
public:
virtual srs_error_t initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
virtual std::string get_curr_origin();
@ -172,7 +175,9 @@ private:
class SrsEdgeForwarder : public ISrsCoroutineHandler
{
private:
SrsLiveSource* source;
// Because source references to this object, so we should directly use the source ptr.
SrsLiveSource* source_;
private:
SrsPublishEdge* edge;
SrsRequest* req;
SrsCoroutine* trd;
@ -191,7 +196,7 @@ public:
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
// Interface ISrsReusableThread2Handler
@ -216,7 +221,7 @@ public:
// Always use the req of source,
// For we assume all client to edge is invalid,
// if auth open, edge must valid it from origin, then service it.
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req);
// When client play stream on edge.
virtual srs_error_t on_client_play();
// When all client stopped play, disconnect to origin.
@ -239,7 +244,7 @@ public:
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req);
virtual bool can_publish();
// When client publish stream on edge.
virtual srs_error_t on_client_publish();

View file

@ -547,13 +547,13 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return http_static->mux.serve_http(w, r);
}
srs_error_t SrsHttpServer::http_mount(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsHttpServer::http_mount(SrsRequest* r)
{
return http_stream->http_mount(s, r);
return http_stream->http_mount(r);
}
void SrsHttpServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
void SrsHttpServer::http_unmount(SrsRequest* r)
{
http_stream->http_unmount(s, r);
http_stream->http_unmount(r);
}

View file

@ -187,8 +187,8 @@ public:
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
public:
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t http_mount(SrsRequest* r);
virtual void http_unmount(SrsRequest* r);
};
#endif

View file

@ -40,10 +40,9 @@ using namespace std;
#include <srs_app_recv_thread.hpp>
#include <srs_app_http_hooks.hpp>
SrsBufferCache::SrsBufferCache(SrsLiveSource* s, SrsRequest* r)
SrsBufferCache::SrsBufferCache(SrsRequest* r)
{
req = r->copy()->as_http();
source = s;
queue = new SrsMessageQueue(true);
trd = new SrsSTCoroutine("http-stream", this);
@ -59,11 +58,10 @@ SrsBufferCache::~SrsBufferCache()
srs_freep(req);
}
srs_error_t SrsBufferCache::update_auth(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsBufferCache::update_auth(SrsRequest* r)
{
srs_freep(req);
req = r->copy();
source = s;
return srs_success;
}
@ -108,14 +106,19 @@ srs_error_t SrsBufferCache::cycle()
return err;
}
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}
// the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge.
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
if ((err = live_source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
if ((err = source->consumer_dumps(consumer, false, false, true)) != srs_success) {
if ((err = live_source->consumer_dumps(consumer, false, false, true)) != srs_success) {
return srs_error_wrap(err, "dumps consumer");
}
@ -553,9 +556,8 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
return writer->writev(iov, iovcnt, pnwrite);
}
SrsLiveStream::SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c)
SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
{
source = s;
cache = c;
req = r->copy()->as_http();
security_ = new SrsSecurity();
@ -567,10 +569,8 @@ SrsLiveStream::~SrsLiveStream()
srs_freep(security_);
}
srs_error_t SrsLiveStream::update_auth(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsLiveStream::update_auth(SrsRequest* r)
{
source = s;
srs_freep(req);
req = r->copy()->as_http();
@ -661,13 +661,18 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK);
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
if ((err = live_source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
if ((err = live_source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
return srs_error_wrap(err, "dumps consumer");
}
@ -689,7 +694,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) {
if ((err = enc->dump_cache(consumer, live_source->jitter())) != srs_success) {
return srs_error_wrap(err, "encoder dump cache");
}
}
@ -876,7 +881,6 @@ SrsLiveEntry::SrsLiveEntry(std::string m)
cache = NULL;
req = NULL;
source = NULL;
std::string ext = srs_path_filext(m);
_is_flv = (ext == ".flv");
@ -954,7 +958,7 @@ srs_error_t SrsHttpStreamServer::initialize()
}
// TODO: FIXME: rename for HTTP FLV mount.
srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
{
srs_error_t err = srs_success;
@ -982,10 +986,9 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
entry = new SrsLiveEntry(mount);
entry->source = s;
entry->req = r->copy()->as_http();
entry->cache = new SrsBufferCache(s, r);
entry->stream = new SrsLiveStream(s, r, entry->cache);
entry->cache = new SrsBufferCache(r);
entry->stream = new SrsLiveStream(r, entry->cache);
// TODO: FIXME: maybe refine the logic of http remux service.
// if user push streams followed:
@ -995,7 +998,6 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
// so, need to free last request object, otherwise, it will cause memory leak.
srs_freep(tmpl->req);
tmpl->source = s;
tmpl->req = r->copy()->as_http();
sflvs[sid] = entry;
@ -1015,8 +1017,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = sflvs[sid];
entry->stream->update_auth(s, r);
entry->cache->update_auth(s, r);
entry->stream->update_auth(r);
entry->cache->update_auth(r);
}
if (entry->stream) {
@ -1027,7 +1029,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
return err;
}
void SrsHttpStreamServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
void SrsHttpStreamServer::http_unmount(SrsRequest* r)
{
std::string sid = r->get_stream_url();
@ -1134,19 +1136,19 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}
}
SrsLiveSource* s = NULL;
if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) {
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(r, server, live_source)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(s != NULL);
srs_assert(live_source.get() != NULL);
bool enabled_cache = _srs_config->get_gop_cache(r->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost);
s->set_cache(enabled_cache);
s->set_gop_cache_max_frames(gcmf);
live_source->set_cache(enabled_cache);
live_source->set_gop_cache_max_frames(gcmf);
// create http streaming handler.
if ((err = http_mount(s, r)) != srs_success) {
if ((err = http_mount(r)) != srs_success) {
return srs_error_wrap(err, "http mount");
}
@ -1161,7 +1163,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
// trigger edge to fetch from origin.
bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost);
srs_trace("flv: source url=%s, is_edge=%d, source_id=%s/%s",
r->get_stream_url().c_str(), vhost_is_edge, s->source_id().c_str(), s->pre_source_id().c_str());
r->get_stream_url().c_str(), vhost_is_edge, live_source->source_id().c_str(), live_source->pre_source_id().c_str());
return err;
}

View file

@ -23,13 +23,12 @@ private:
srs_utime_t fast_cache;
private:
SrsMessageQueue* queue;
SrsLiveSource* source;
SrsRequest* req;
SrsCoroutine* trd;
public:
SrsBufferCache(SrsLiveSource* s, SrsRequest* r);
SrsBufferCache(SrsRequest* r);
virtual ~SrsBufferCache();
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t start();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
@ -178,13 +177,12 @@ class SrsLiveStream : public ISrsHttpHandler
{
private:
SrsRequest* req;
SrsLiveSource* source;
SrsBufferCache* cache;
SrsSecurity* security_;
public:
SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c);
SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
@ -205,8 +203,6 @@ private:
public:
// We will free the request.
SrsRequest* req;
// Shared source.
SrsLiveSource* source;
public:
// For template, the mount contains variables.
// For concrete stream, the mount is url to access.
@ -244,8 +240,8 @@ public:
virtual srs_error_t initialize();
public:
// HTTP flv/ts/mp3/aac stream
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t http_mount(SrsRequest* r);
virtual void http_unmount(SrsRequest* r);
// Interface ISrsHttpMatchHijacker
public:
virtual srs_error_t hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);

View file

@ -259,13 +259,13 @@ void SrsQueueRecvThread::on_stop()
}
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid)
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr<SrsLiveSource> source, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
rtmp = rtmp_sdk;
_conn = conn;
_source = source;
source_ = source;
nn_msgs_for_yield_ = 0;
recv_error = srs_success;
@ -370,7 +370,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
srs_update_system_time(), msg->header.timestamp, msg->size);
// the rtmp connection will handle this message
err = _conn->handle_publish_message(_source, msg);
err = _conn->handle_publish_message(source_, msg);
// must always free it,
// the source will copy it if need to use.

View file

@ -16,6 +16,7 @@
#include <srs_protocol_stream.hpp>
#include <srs_core_performance.hpp>
#include <srs_app_reload.hpp>
#include <srs_core_autofree.hpp>
class SrsRtmpServer;
class SrsCommonMessage;
@ -146,7 +147,7 @@ private:
srs_error_t recv_error;
SrsRtmpConn* _conn;
// The params for conn callback.
SrsLiveSource* _source;
SrsSharedPtr<SrsLiveSource> source_;
// The error timeout cond
srs_cond_t error;
// The merged context id.
@ -154,7 +155,7 @@ private:
SrsContextId ncid;
public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid);
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr<SrsLiveSource> source, SrsContextId parent_cid);
virtual ~SrsPublishRecvThread();
public:
// Wait for error for some timeout.

View file

@ -224,8 +224,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
// For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728
if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) {
SrsLiveSource* rtmp = _srs_sources->fetch(ruc->req_);
if (rtmp && !rtmp->inactive()) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(ruc->req_);
if (live_source.get() && !live_source->inactive()) {
return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str());
}
}

View file

@ -646,7 +646,7 @@ srs_error_t SrsRtcPlayStream::cycle()
SrsRtcConsumer* consumer = NULL;
SrsAutoFree(SrsRtcConsumer, consumer);
if ((err = source->create_consumer(source_, consumer)) != srs_success) {
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());
}
@ -1202,8 +1202,8 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
source_->set_publish_stream(this);
// TODO: FIMXE: Check it in SrsRtcConnection::add_publisher?
SrsLiveSource *rtmp = _srs_sources->fetch(r);
if (rtmp && !rtmp->can_publish(false)) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(r);
if (live_source.get() && !live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str());
}
@ -1227,16 +1227,16 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
if (rtc_to_rtmp) {
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
// Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync,
// especially for stream merging.
rtmp->set_cache(false);
live_source->set_cache(false);
SrsCompositeBridge* bridge = new SrsCompositeBridge();
bridge->append(new SrsFrameToRtmpBridge(rtmp));
bridge->append(new SrsFrameToRtmpBridge(live_source));
if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);

View file

@ -154,7 +154,7 @@ ISrsRtcSourceChangeCallback::~ISrsRtcSourceChangeCallback()
{
}
SrsRtcConsumer::SrsRtcConsumer(SrsSharedPtr<SrsRtcSource> s)
SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s)
{
source_ = s;
should_update_source_id = false;
@ -486,11 +486,11 @@ void SrsRtcSource::set_bridge(ISrsStreamBridge* bridge)
#endif
}
srs_error_t SrsRtcSource::create_consumer(SrsSharedPtr<SrsRtcSource> source, SrsRtcConsumer*& consumer)
srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsRtcConsumer(source);
consumer = new SrsRtcConsumer(this);
consumers.push_back(consumer);
// TODO: FIXME: Implements edge cluster.

View file

@ -80,7 +80,9 @@ public:
class SrsRtcConsumer
{
private:
SrsSharedPtr<SrsRtcSource> source_;
// Because source references to this object, so we should directly use the source ptr.
SrsRtcSource* source_;
private:
std::vector<SrsRtpPacket*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
@ -92,7 +94,7 @@ private:
// The callback for stream change event.
ISrsRtcSourceChangeCallback* handler_;
public:
SrsRtcConsumer(SrsSharedPtr<SrsRtcSource> s);
SrsRtcConsumer(SrsRtcSource* s);
virtual ~SrsRtcConsumer();
public:
// When source id changed, notice client to print.
@ -215,7 +217,7 @@ public:
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsSharedPtr<SrsRtcSource> source, SrsRtcConsumer*& consumer);
virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer);
// Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata.

View file

@ -571,19 +571,19 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
// find a source to serve.
SrsLiveSource* source = NULL;
if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(req, server, live_source)) != srs_success) {
return srs_error_wrap(err, "rtmp: fetch source");
}
srs_assert(source != NULL);
srs_assert(live_source.get() != NULL);
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d/%d, is_edge=%d, source_id=%s/%s",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, gcmf, info->edge, source->source_id().c_str(),
source->pre_source_id().c_str());
source->set_cache(enabled_cache);
source->set_gop_cache_max_frames(gcmf);
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, gcmf, info->edge, live_source->source_id().c_str(),
live_source->pre_source_id().c_str());
live_source->set_cache(enabled_cache);
live_source->set_gop_cache_max_frames(gcmf);
switch (info->type) {
case SrsRtmpConnPlay: {
@ -610,7 +610,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
span_main_->end();
#endif
err = playing(source);
err = playing(live_source);
http_hooks_on_stop();
return err;
@ -627,7 +627,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
span_main_->end();
#endif
return publishing(source);
return publishing(live_source);
}
case SrsRtmpConnHaivisionPublish: {
if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
@ -641,7 +641,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
span_main_->end();
#endif
return publishing(source);
return publishing(live_source);
}
case SrsRtmpConnFlashPublish: {
if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
@ -655,7 +655,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
span_main_->end();
#endif
return publishing(source);
return publishing(live_source);
}
default: {
return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
@ -699,7 +699,7 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost)
return err;
}
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
{
srs_error_t err = srs_success;
@ -786,7 +786,7 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
return err;
}
srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{
srs_error_t err = srs_success;
@ -923,7 +923,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
return err;
}
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
{
srs_error_t err = srs_success;
@ -969,7 +969,7 @@ srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
return err;
}
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPublishRecvThread* rtrd)
{
srs_error_t err = srs_success;
@ -1073,7 +1073,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
return err;
}
srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
{
srs_error_t err = srs_success;
@ -1141,7 +1141,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
return err;
}
void SrsRtmpConn::release_publish(SrsLiveSource* source)
void SrsRtmpConn::release_publish(SrsSharedPtr<SrsLiveSource> source)
{
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
@ -1152,7 +1152,7 @@ void SrsRtmpConn::release_publish(SrsLiveSource* source)
}
}
srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
@ -1193,7 +1193,7 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommon
return err;
}
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;

View file

@ -16,6 +16,7 @@
#include <srs_app_reload.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_rtmp_conn.hpp>
#include <srs_core_autofree.hpp>
class SrsServer;
class SrsRtmpServer;
@ -145,14 +146,14 @@ private:
// The stream(play/publish) service cycle, identify client first.
virtual srs_error_t stream_service_cycle();
virtual srs_error_t check_vhost(bool try_default_vhost);
virtual srs_error_t playing(SrsLiveSource* source);
virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsLiveSource* source);
virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsLiveSource* source);
virtual void release_publish(SrsLiveSource* source);
virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t playing(SrsSharedPtr<SrsLiveSource> source);
virtual srs_error_t do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsSharedPtr<SrsLiveSource> source);
virtual srs_error_t do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsSharedPtr<SrsLiveSource> source);
virtual void release_publish(SrsSharedPtr<SrsLiveSource> source);
virtual srs_error_t handle_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg);
virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg);
virtual void set_sock_options();
private:

View file

@ -1302,28 +1302,28 @@ srs_error_t SrsServer::on_reload_listen()
return err;
}
srs_error_t SrsServer::on_publish(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsServer::on_publish(SrsRequest* r)
{
srs_error_t err = srs_success;
if ((err = http_server->http_mount(s, r)) != srs_success) {
if ((err = http_server->http_mount(r)) != srs_success) {
return srs_error_wrap(err, "http mount");
}
SrsCoWorkers* coworkers = SrsCoWorkers::instance();
if ((err = coworkers->on_publish(s, r)) != srs_success) {
if ((err = coworkers->on_publish(r)) != srs_success) {
return srs_error_wrap(err, "coworkers");
}
return err;
}
void SrsServer::on_unpublish(SrsLiveSource* s, SrsRequest* r)
void SrsServer::on_unpublish(SrsRequest* r)
{
http_server->http_unmount(s, r);
http_server->http_unmount(r);
SrsCoWorkers* coworkers = SrsCoWorkers::instance();
coworkers->on_unpublish(s, r);
coworkers->on_unpublish(r);
}
SrsServerAdapter::SrsServerAdapter()

View file

@ -234,8 +234,8 @@ public:
virtual srs_error_t on_reload_listen();
// Interface ISrsLiveSourceHandler
public:
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t on_publish(SrsRequest* r);
virtual void on_unpublish(SrsRequest* r);
};
// The SRS server adapter, the master server.

View file

@ -407,7 +407,7 @@ ISrsWakable::~ISrsWakable()
SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s)
{
source = s;
source_ = s;
paused = false;
jitter = new SrsRtmpJitter();
queue = new SrsMessageQueue();
@ -423,7 +423,7 @@ SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s)
SrsLiveConsumer::~SrsLiveConsumer()
{
source->on_consumer_destroy(this);
source_->on_consumer_destroy(this);
srs_freep(jitter);
srs_freep(queue);
@ -506,7 +506,7 @@ srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray* msgs, int& count)
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str());
srs_trace("update source_id=%s/%s", source_->source_id().c_str(), source_->pre_source_id().c_str());
should_update_source_id = false;
}
@ -822,7 +822,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop()
SrsOriginHub::SrsOriginHub()
{
source = NULL;
source_ = NULL;
req_ = NULL;
is_active = false;
@ -861,12 +861,13 @@ SrsOriginHub::~SrsOriginHub()
#endif
}
srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r)
srs_error_t SrsOriginHub::initialize(SrsSharedPtr<SrsLiveSource> s, SrsRequest* r)
{
srs_error_t err = srs_success;
req_ = r;
source = s;
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();
if ((err = hls->initialize(this, req_)) != srs_success) {
return srs_error_wrap(err, "hls initialize");
@ -936,7 +937,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
srs_error_t err = srs_success;
SrsSharedPtrMessage* msg = shared_audio;
SrsRtmpFormat* format = source->format_;
SrsRtmpFormat* format = source_->format_;
// Handle the metadata when got sequence header.
if (format->is_aac_sequence_header() || format->is_mp3_sequence_header()) {
@ -973,7 +974,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
hls->on_unpublish();
srs_error_reset(err);
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(srs_error_code(err), source->meta->ash(), msg)) {
if (srs_hls_can_continue(srs_error_code(err), source_->meta->ash(), msg)) {
srs_error_reset(err);
} else {
return srs_error_wrap(err, "hls: audio");
@ -1022,7 +1023,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
srs_error_t err = srs_success;
SrsSharedPtrMessage* msg = shared_video;
SrsRtmpFormat* format = source->format_;
SrsRtmpFormat* format = source_->format_;
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
@ -1066,7 +1067,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
hls->on_unpublish();
srs_error_reset(err);
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(srs_error_code(err), source->meta->vsh(), msg)) {
if (srs_hls_can_continue(srs_error_code(err), source_->meta->vsh(), msg)) {
srs_error_reset(err);
} else {
return srs_error_wrap(err, "hls: video");
@ -1177,9 +1178,9 @@ srs_error_t SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder)
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* cache_metadata = source->meta->data();
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
SrsSharedPtrMessage* cache_metadata = source_->meta->data();
SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash();
// feed the forwarder the metadata/sequence header,
// when reload to enable the forwarder.
@ -1200,9 +1201,9 @@ srs_error_t SrsOriginHub::on_dvr_request_sh()
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* cache_metadata = source->meta->data();
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
SrsSharedPtrMessage* cache_metadata = source_->meta->data();
SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash();
// feed the dvr the metadata/sequence header,
// when reload to start dvr, dvr will never get the sequence header in stream,
@ -1212,13 +1213,13 @@ srs_error_t SrsOriginHub::on_dvr_request_sh()
}
if (cache_sh_video) {
if ((err = dvr->on_video(cache_sh_video, source->meta->vsh_format())) != srs_success) {
if ((err = dvr->on_video(cache_sh_video, source_->meta->vsh_format())) != srs_success) {
return srs_error_wrap(err, "dvr video");
}
}
if (cache_sh_audio) {
if ((err = dvr->on_audio(cache_sh_audio, source->meta->ash_format())) != srs_success) {
if ((err = dvr->on_audio(cache_sh_audio, source_->meta->ash_format())) != srs_success) {
return srs_error_wrap(err, "dvr audio");
}
}
@ -1230,16 +1231,16 @@ srs_error_t SrsOriginHub::on_hls_request_sh()
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh();
if (cache_sh_video) {
if ((err = hls->on_video(cache_sh_video, source->meta->vsh_format())) != srs_success) {
if ((err = hls->on_video(cache_sh_video, source_->meta->vsh_format())) != srs_success) {
return srs_error_wrap(err, "hls video");
}
}
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash();
if (cache_sh_audio) {
if ((err = hls->on_audio(cache_sh_audio, source->meta->ash_format())) != srs_success) {
if ((err = hls->on_audio(cache_sh_audio, source_->meta->ash_format())) != srs_success) {
return srs_error_wrap(err, "hls audio");
}
}
@ -1295,9 +1296,9 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost)
return srs_error_wrap(err, "dash start publish");
}
SrsRtmpFormat* format = source->format_;
SrsRtmpFormat* format = source_->format_;
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_video = source_->meta->vsh();
if (cache_sh_video) {
if ((err = format->on_video(cache_sh_video)) != srs_success) {
return srs_error_wrap(err, "format on_video");
@ -1307,7 +1308,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost)
}
}
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
SrsSharedPtrMessage* cache_sh_audio = source_->meta->ash();
if (cache_sh_audio) {
if ((err = format->on_audio(cache_sh_audio)) != srs_success) {
return srs_error_wrap(err, "format on_audio");
@ -1759,67 +1760,64 @@ srs_error_t SrsLiveSourceManager::initialize()
return setup_ticks();
}
srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps)
srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsSharedPtr<SrsLiveSource>& pps)
{
srs_error_t err = srs_success;
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller lock.
// TODO: FIXME: Use smaller scope lock.
SrsLocker(lock);
SrsLiveSource* source = NULL;
if ((source = fetch(r)) != NULL) {
string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsLiveSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
SrsSharedPtr<SrsLiveSource>& source = it->second;
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->update_auth(r);
*pps = source;
pps = source;
return err;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
SrsSharedPtr<SrsLiveSource> source = new SrsLiveSource();
srs_trace("new live source, stream_url=%s", stream_url.c_str());
source = new SrsLiveSource();
if ((err = source->initialize(r, h)) != srs_success) {
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed;
if ((err = source->initialize(source, r, h)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}
pool[stream_url] = source;
*pps = source;
return err;
failed:
srs_freep(source);
pps = source;
return err;
}
SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r)
SrsSharedPtr<SrsLiveSource> SrsLiveSourceManager::fetch(SrsRequest* r)
{
SrsLiveSource* source = NULL;
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller scope lock.
SrsLocker(lock);
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
std::map< std::string, SrsSharedPtr<SrsLiveSource> >::iterator it = pool.find(stream_url);
if (it == pool.end()) {
return SrsSharedPtr<SrsLiveSource>(NULL);
}
source = pool[stream_url];
SrsSharedPtr<SrsLiveSource>& source = it->second;
return source;
}
void SrsLiveSourceManager::dispose()
{
std::map<std::string, SrsLiveSource*>::iterator it;
std::map< std::string, SrsSharedPtr<SrsLiveSource> >::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsLiveSource* source = it->second;
SrsSharedPtr<SrsLiveSource>& source = it->second;
source->dispose();
}
return;
@ -1844,9 +1842,9 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
{
srs_error_t err = srs_success;
std::map<std::string, SrsLiveSource*>::iterator it;
std::map< std::string, SrsSharedPtr<SrsLiveSource> >::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsLiveSource* source = it->second;
SrsSharedPtr<SrsLiveSource>& source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
if ((err = source->cycle()) != srs_success) {
@ -1856,19 +1854,11 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
// See SrsSrtSource::on_consumer_destroy
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
#if 0
#if 1
// When source expired, remove it.
if (source->stream_is_dead()) {
int cid = source->source_id();
if (cid == -1 && source->pre_source_id() > 0) {
cid = source->pre_source_id();
}
if (cid > 0) {
_srs_context->set_id(cid);
}
srs_trace("cleanup die source, total=%d", (int)pool.size());
srs_freep(source);
const SrsContextId& cid = source->source_id();
srs_trace("cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
@ -1883,11 +1873,6 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
void SrsLiveSourceManager::destroy()
{
std::map<std::string, SrsLiveSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsLiveSource* source = it->second;
srs_freep(source);
}
pool.clear();
}
@ -1995,7 +1980,7 @@ bool SrsLiveSource::publisher_is_idle_for(srs_utime_t timeout)
return false;
}
srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h)
srs_error_t SrsLiveSource::initialize(SrsSharedPtr<SrsLiveSource> wrapper, SrsRequest* r, ISrsLiveSourceHandler* h)
{
srs_error_t err = srs_success;
@ -2013,14 +1998,14 @@ srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h)
// Setup the SPS/PPS parsing strategy.
format_->try_annexb_first = _srs_config->try_annexb_first(r->vhost);
if ((err = hub->initialize(this, req)) != srs_success) {
if ((err = hub->initialize(wrapper, req)) != srs_success) {
return srs_error_wrap(err, "hub");
}
if ((err = play_edge->initialize(this, req)) != srs_success) {
if ((err = play_edge->initialize(wrapper, req)) != srs_success) {
return srs_error_wrap(err, "edge(play)");
}
if ((err = publish_edge->initialize(this, req)) != srs_success) {
if ((err = publish_edge->initialize(wrapper, req)) != srs_success) {
return srs_error_wrap(err, "edge(publish)");
}
@ -2600,7 +2585,7 @@ srs_error_t SrsLiveSource::on_publish()
// notify the handler.
srs_assert(handler);
if ((err = handler->on_publish(this, req)) != srs_success) {
if ((err = handler->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "handle publish");
}
@ -2652,7 +2637,7 @@ void SrsLiveSource::on_unpublish()
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);
handler->on_unpublish(this, req);
handler->on_unpublish(req);
if (bridge_) {
bridge_->on_unpublish();

View file

@ -19,6 +19,7 @@
#include <srs_protocol_st.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_stream_bridge.hpp>
#include <srs_core_autofree.hpp>
class SrsFormat;
class SrsRtmpFormat;
@ -168,9 +169,11 @@ public:
// The consumer for SrsLiveSource, that is a play client.
class SrsLiveConsumer : public ISrsWakable
{
private:
// Because source references to this object, so we should directly use the source ptr.
SrsLiveSource* source_;
private:
SrsRtmpJitter* jitter;
SrsLiveSource* source;
SrsMessageQueue* queue;
bool paused;
// when source id changed, notice all consumers
@ -288,9 +291,9 @@ public:
virtual ~ISrsLiveSourceHandler();
public:
// when stream start publish, mount stream.
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r) = 0;
virtual srs_error_t on_publish(SrsRequest* r) = 0;
// when stream stop publish, unmount stream.
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r) = 0;
virtual void on_unpublish(SrsRequest* r) = 0;
};
// The mix queue to correct the timestamp for mix_correct algorithm.
@ -315,7 +318,9 @@ public:
class SrsOriginHub : public ISrsReloadHandler
{
private:
SrsLiveSource* source;
// Because source references to this object, so we should directly use the source ptr.
SrsLiveSource* source_;
private:
SrsRequest* req_;
bool is_active;
private:
@ -341,7 +346,7 @@ public:
public:
// Initialize the hub with source and request.
// @param r The request object, managed by source.
virtual srs_error_t initialize(SrsLiveSource* s, SrsRequest* r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsRequest* r);
// Dispose the hub, release utilities resource,
// For example, delete all HLS pieces.
virtual void dispose();
@ -443,7 +448,7 @@ class SrsLiveSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map<std::string, SrsLiveSource*> pool;
std::map< std::string, SrsSharedPtr<SrsLiveSource> > pool;
SrsHourGlass* timer_;
public:
SrsLiveSourceManager();
@ -454,10 +459,10 @@ public:
// @param r the client request.
// @param h the event handler for source.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps);
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsSharedPtr<SrsLiveSource>& pps);
public:
// Get the exists source, NULL when not exists.
virtual SrsLiveSource* fetch(SrsRequest* r);
virtual SrsSharedPtr<SrsLiveSource> fetch(SrsRequest* r);
public:
// dispose and cycle all sources.
virtual void dispose();
@ -539,7 +544,7 @@ public:
bool publisher_is_idle_for(srs_utime_t timeout);
public:
// Initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> wrapper, SrsRequest* r, ISrsLiveSourceHandler* h);
// Bridge to other source, forward packets to it.
void set_bridge(ISrsStreamBridge* v);
// Interface ISrsReloadHandler

View file

@ -368,16 +368,16 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish()
}
// Check rtmp stream is busy.
SrsLiveSource *live_source = _srs_sources->fetch(req_);
if (live_source && !live_source->can_publish(false)) {
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req_);
if (live_source.get() && !live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str());
}
if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) {
if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
srs_assert(live_source != NULL);
srs_assert(live_source.get() != NULL);
bool enabled_cache = _srs_config->get_gop_cache(req_->vhost);
int gcmf = _srs_config->get_gop_cache_max_frames(req_->vhost);
@ -489,7 +489,7 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
SrsSrtConsumer* consumer = NULL;
SrsAutoFree(SrsSrtConsumer, consumer);
if ((err = srt_source_->create_consumer(srt_source_, consumer)) != srs_success) {
if ((err = srt_source_->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer, ts source=%s", req_->get_stream_url().c_str());
}
srs_assert(consumer);

View file

@ -152,7 +152,7 @@ void SrsSrtSourceManager::eliminate(SrsRequest* r)
SrsSrtSourceManager* _srs_srt_sources = NULL;
SrsSrtConsumer::SrsSrtConsumer(SrsSharedPtr<SrsSrtSource> s)
SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource* s)
{
source_ = s;
should_update_source_id = false;
@ -942,11 +942,11 @@ void SrsSrtSource::set_bridge(ISrsStreamBridge* bridge)
frame_builder_ = new SrsSrtFrameBuilder(bridge);
}
srs_error_t SrsSrtSource::create_consumer(SrsSharedPtr<SrsSrtSource> source, SrsSrtConsumer*& consumer)
srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsSrtConsumer(source);
consumer = new SrsSrtConsumer(this);
consumers.push_back(consumer);
return err;

View file

@ -70,10 +70,12 @@ extern SrsSrtSourceManager* _srs_srt_sources;
class SrsSrtConsumer
{
public:
SrsSrtConsumer(SrsSharedPtr<SrsSrtSource> source);
SrsSrtConsumer(SrsSrtSource* source);
virtual ~SrsSrtConsumer();
private:
SrsSharedPtr<SrsSrtSource> source_;
// Because source references to this object, so we should directly use the source ptr.
SrsSrtSource* source_;
private:
std::vector<SrsSrtPacket*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
@ -167,7 +169,7 @@ public:
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsSharedPtr<SrsSrtSource> source, SrsSrtConsumer*& consumer);
virtual srs_error_t create_consumer(SrsSrtConsumer*& consumer);
// Dumps packets in cache to consumer.
virtual srs_error_t consumer_dumps(SrsSrtConsumer* consumer);
virtual void on_consumer_destroy(SrsSrtConsumer* consumer);

View file

@ -25,9 +25,9 @@ ISrsStreamBridge::~ISrsStreamBridge()
{
}
SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsLiveSource *src)
SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsSharedPtr<SrsLiveSource> source)
{
source_ = src;
source_ = source;
}
SrsFrameToRtmpBridge::~SrsFrameToRtmpBridge()

View file

@ -42,9 +42,9 @@ public:
class SrsFrameToRtmpBridge : public ISrsStreamBridge
{
private:
SrsLiveSource *source_;
SrsSharedPtr<SrsLiveSource> source_;
public:
SrsFrameToRtmpBridge(SrsLiveSource *src);
SrsFrameToRtmpBridge(SrsSharedPtr<SrsLiveSource> source);
virtual ~SrsFrameToRtmpBridge();
public:
srs_error_t initialize(SrsRequest* r);

View file

@ -11,6 +11,9 @@
#include <stdlib.h>
// The auto free helper, which is actually the unique ptr, without the move feature,
// see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107
//
// To free the instance in the current scope, for instance, MyClass* ptr,
// which is a ptr and this class will:
// 1. free the ptr.
@ -81,7 +84,9 @@ public:
}
};
// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107
// Shared ptr smart pointer, only support shared ptr, no weak ptr, no shared from this, no inheritance,
// no comparing, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107
//
// Usage:
// SrsSharedPtr<MyClass> ptr(new MyClass());
// ptr->do_something();

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 6
#define VERSION_MINOR 0
#define VERSION_REVISION 128
#define VERSION_REVISION 129
#endif

View file

@ -107,6 +107,7 @@
XX(ERROR_BACKTRACE_ADDR2LINE , 1094, "BacktraceAddr2Line", "Backtrace addr2line failed") \
XX(ERROR_SYSTEM_FILE_NOT_OPEN , 1095, "FileNotOpen", "File is not opened") \
XX(ERROR_SYSTEM_FILE_SETVBUF , 1096, "FileSetVBuf", "Failed to set file vbuf") \
XX(ERROR_NO_SOURCE , 1097, "NoSource", "No source found")
/**************************************************/
/* RTMP protocol error. */
@ -334,7 +335,7 @@
XX(ERROR_STREAM_CASTER_HEVC_FORMAT , 4057, "CasterTsHevcFormat", "Invalid ts HEVC Format for stream caster") \
XX(ERROR_HTTP_JSONP , 4058, "HttpJsonp", "Invalid callback for JSONP") \
XX(ERROR_HEVC_NALU_UEV , 4059, "HevcNaluUev", "Failed to read UEV for HEVC NALU") \
XX(ERROR_HEVC_NALU_SEV , 4060, "HevcNaluSev", "Failed to read SEV for HEVC NALU") \
XX(ERROR_HEVC_NALU_SEV , 4060, "HevcNaluSev", "Failed to read SEV for HEVC NALU")
/**************************************************/

View file

@ -139,6 +139,17 @@ VOID TEST(CoreLogger, SharedPtrReset)
}
}
SrsSharedPtr<int> mock_create_from_ptr(SrsSharedPtr<int> p) {
return p;
}
VOID TEST(CoreLogger, SharedPtrContructor)
{
int* p = new int(100);
SrsSharedPtr<int> q = mock_create_from_ptr(p);
EXPECT_EQ(100, *q);
}
VOID TEST(CoreLogger, SharedPtrObject)
{
SrsSharedPtr<MyNormalObject> p(new MyNormalObject(100));