diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md
index f58ab269f..3ad0c3217 100644
--- a/trunk/doc/CHANGELOG.md
+++ b/trunk/doc/CHANGELOG.md
@@ -7,6 +7,7 @@ The changelog for SRS.
## SRS 6.0 Changelog
+* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080)
* v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057)
* v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044)
* v6.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v6.0.123 (#4038)
diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp
index 7652be15b..90d818ece 100644
--- a/trunk/src/app/srs_app_conn.cpp
+++ b/trunk/src/app/srs_app_conn.cpp
@@ -413,28 +413,6 @@ void SrsResourceManager::dispose(ISrsResource* c)
}
}
-SrsLazySweepGc::SrsLazySweepGc()
-{
-}
-
-SrsLazySweepGc::~SrsLazySweepGc()
-{
-}
-
-srs_error_t SrsLazySweepGc::start()
-{
- srs_error_t err = srs_success;
- return err;
-}
-
-void SrsLazySweepGc::remove(SrsLazyObject* c)
-{
- // TODO: FIXME: MUST lazy sweep.
- srs_freep(c);
-}
-
-ISrsLazyGc* _srs_gc = NULL;
-
ISrsExpire::ISrsExpire()
{
}
diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp
index b5aeb48da..c0e965525 100644
--- a/trunk/src/app/srs_app_conn.hpp
+++ b/trunk/src/app/srs_app_conn.hpp
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
class SrsWallClock;
class SrsBuffer;
@@ -125,98 +126,66 @@ private:
void dispose(ISrsResource* c);
};
-// A simple lazy-sweep GC, just wait for a long time to delete the disposable resources.
-class SrsLazySweepGc : public ISrsLazyGc
-{
-public:
- SrsLazySweepGc();
- virtual ~SrsLazySweepGc();
-public:
- virtual srs_error_t start();
- virtual void remove(SrsLazyObject* c);
-};
-
-extern ISrsLazyGc* _srs_gc;
-
-// A wrapper template for lazy-sweep resource.
-// See https://github.com/ossrs/srs/issues/3176#lazy-sweep
+// This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this
+// smart pointer resource, such as by implementing delayed release.
//
-// Usage for resource which manages itself in coroutine cycle, see SrsLazyGbSession:
-// class Resource {
-// private:
-// SrsLazyObjectWrapper* wrapper_;
-// private:
-// friend class SrsLazyObjectWrapper;
-// Resource(SrsLazyObjectWrapper* wrapper) { wrapper_ = wrapper; }
-// public:
-// srs_error_t Resource::cycle() {
-// srs_error_t err = do_cycle();
-// _srs_gb_manager->remove(wrapper_);
-// return err;
-// }
-// };
-// SrsLazyObjectWrapper* obj = new SrsLazyObjectWrapper*();
-// _srs_gb_manager->add(obj); // Add wrapper to resource manager.
-// Start a coroutine to do obj->resource()->cycle().
+// It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage
+// is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted
+// to one another.
//
-// Usage for resource managed by other object:
-// class Resource {
-// private:
-// friend class SrsLazyObjectWrapper;
-// Resource(SrsLazyObjectWrapper* /*wrapper*/) {
-// }
-// };
-// class Manager {
-// private:
-// SrsLazyObjectWrapper* wrapper_;
-// public:
-// Manager() { wrapper_ = new SrsLazyObjectWrapper(); }
-// ~Manager() { srs_freep(wrapper_); }
-// };
-// Manager* manager = new Manager();
-// srs_freep(manager);
+// Note that we don't need to implement the move constructor and move assignment operator, because we directly
+// use SrsSharedPtr as instance member, so we can only copy it.
//
-// Note that under-layer resource are destroyed by _srs_gc, which is literally equal to srs_freep. However, the root
-// wrapper might be managed by other resource manager, such as _srs_gb_manager for SrsLazyGbSession. Furthermore, other
-// copied out wrappers might be freed by srs_freep. All are ok, because all wrapper and resources are simply normal
-// object, so if you added to manager then you should use manager to remove it, and you can also directly delete it.
+// Usage:
+// SrsSharedResource* ptr = new SrsSharedResource(new MyClass());
+// (*ptr)->do_something();
+//
+// ISrsResourceManager* manager = ...;
+// manager->remove(ptr);
template
-class SrsLazyObjectWrapper : public ISrsResource
+class SrsSharedResource : public ISrsResource
{
private:
- T* resource_;
+ SrsSharedPtr ptr_;
public:
- SrsLazyObjectWrapper() {
- init(new T(this));
+ SrsSharedResource(T* ptr) : ptr_(ptr) {
}
- virtual ~SrsLazyObjectWrapper() {
- resource_->gc_dispose();
- if (resource_->gc_ref() == 0) {
- _srs_gc->remove(resource_);
+ SrsSharedResource(const SrsSharedResource& cp) : ptr_(cp.ptr_) {
+ }
+ virtual ~SrsSharedResource() {
+ }
+public:
+ // Get the object.
+ T* get() {
+ return ptr_.get();
+ }
+ // Overload the -> operator.
+ T* operator->() {
+ return ptr_.operator->();
+ }
+ // The assign operator.
+ SrsSharedResource& operator=(const SrsSharedResource& cp) {
+ if (this != &cp) {
+ ptr_ = cp.ptr_;
}
+ return *this;
}
private:
- SrsLazyObjectWrapper(T* resource) {
- init(resource);
+ // Overload the * operator.
+ T& operator*() {
+ return ptr_.operator*();
}
- void init(T* resource) {
- resource_ = resource;
- resource_->gc_use();
- }
-public:
- SrsLazyObjectWrapper* copy() {
- return new SrsLazyObjectWrapper(resource_);
- }
- T* resource() {
- return resource_;
+ // Overload the bool operator.
+ operator bool() const {
+ return ptr_.operator bool();
}
// Interface ISrsResource
public:
virtual const SrsContextId& get_id() {
- return resource_->get_id();
+ return ptr_->get_id();
}
virtual std::string desc() {
- return resource_->desc();
+ return ptr_->desc();
}
};
diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp
index 99c9dbf0c..98bfb0d2b 100644
--- a/trunk/src/app/srs_app_gb28181.cpp
+++ b/trunk/src/app/srs_app_gb28181.cpp
@@ -70,11 +70,12 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state)
return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str());
}
-SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root)
+SrsGbSession::SrsGbSession() : sip_(new SrsGbSipTcpConn()), media_(new SrsGbMediaTcpConn())
{
- wrapper_root_ = wrapper_root;
- sip_ = new SrsLazyObjectWrapper();
- media_ = new SrsLazyObjectWrapper();
+ wrapper_ = NULL;
+ owner_coroutine_ = NULL;
+ owner_cid_ = NULL;
+
muxer_ = new SrsGbMuxer(this);
state_ = SrsGbSessionStateInit;
@@ -102,41 +103,43 @@ SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapp
cid_ = _srs_context->generate_id();
_srs_context->set_id(cid_); // Also change current coroutine cid as session's.
- trd_ = new SrsSTCoroutine("GBS", this, cid_);
}
-SrsLazyGbSession::~SrsLazyGbSession()
+SrsGbSession::~SrsGbSession()
{
- srs_freep(trd_);
- srs_freep(sip_);
- srs_freep(media_);
srs_freep(muxer_);
srs_freep(ppp_);
}
-srs_error_t SrsLazyGbSession::initialize(SrsConfDirective* conf)
+void SrsGbSession::setup(SrsConfDirective* conf)
{
- srs_error_t err = srs_success;
-
pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf);
if (candidate_ == "*") {
pip_ = srs_get_public_internet_address(true);
}
std::string output = _srs_config->get_stream_caster_output(conf);
- if ((err = muxer_->initialize(output)) != srs_success) {
- return srs_error_wrap(err, "muxer");
- }
+ muxer_->setup(output);
connecting_timeout_ = _srs_config->get_stream_caster_sip_timeout(conf);
reinvite_wait_ = _srs_config->get_stream_caster_sip_reinvite(conf);
srs_trace("Session: Start timeout=%dms, reinvite=%dms, candidate=%s, pip=%s, output=%s", srsu2msi(connecting_timeout_),
srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str());
-
- return err;
}
-void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs)
+void SrsGbSession::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
+{
+ wrapper_ = wrapper;
+ owner_coroutine_ = owner_coroutine;
+ owner_cid_ = owner_cid;
+}
+
+void SrsGbSession::on_executor_done(ISrsInterruptable* executor)
+{
+ owner_coroutine_ = NULL;
+}
+
+void SrsGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs)
{
// Got a new context, that is new media transport.
if (media_id_ != ctx->media_id_) {
@@ -195,57 +198,47 @@ void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const st
}
}
-void SrsLazyGbSession::on_sip_transport(SrsLazyObjectWrapper* sip)
+void SrsGbSession::on_sip_transport(SrsSharedResource sip)
{
- srs_freep(sip_);
- sip_ = sip->copy();
-
+ sip_ = sip;
// Change id of SIP and all its child coroutines.
- sip_->resource()->set_cid(cid_);
+ sip_->set_cid(cid_);
}
-SrsLazyObjectWrapper* SrsLazyGbSession::sip_transport()
+SrsSharedResource SrsGbSession::sip_transport()
{
return sip_;
}
-void SrsLazyGbSession::on_media_transport(SrsLazyObjectWrapper* media)
+void SrsGbSession::on_media_transport(SrsSharedResource media)
{
- srs_freep(media_);
- media_ = media->copy();
+ media_ = media;
// Change id of SIP and all its child coroutines.
- media_->resource()->set_cid(cid_);
+ media_->set_cid(cid_);
}
-std::string SrsLazyGbSession::pip()
+std::string SrsGbSession::pip()
{
return pip_;
}
-srs_error_t SrsLazyGbSession::start()
+srs_error_t SrsGbSession::cycle()
{
srs_error_t err = srs_success;
- if ((err = trd_->start()) != srs_success) {
- return srs_error_wrap(err, "coroutine");
- }
+ // Update all context id to cid of session.
+ _srs_context->set_id(cid_);
+ owner_cid_->set_cid(cid_);
+ sip_->set_cid(cid_);
+ media_->set_cid(cid_);
- return err;
-}
-
-srs_error_t SrsLazyGbSession::cycle()
-{
- srs_error_t err = do_cycle();
+ // Drive the session cycle.
+ err = do_cycle();
// Interrupt the SIP and media transport when session terminated.
- sip_->resource()->interrupt();
- media_->resource()->interrupt();
-
- // Note that we added wrapper to manager, so we must free the wrapper, not this connection.
- SrsLazyObjectWrapper* wrapper = wrapper_root_;
- srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it.
- _srs_gb_manager->remove(wrapper);
+ sip_->interrupt();
+ media_->interrupt();
// success.
if (err == srs_success) {
@@ -274,12 +267,13 @@ srs_error_t SrsLazyGbSession::cycle()
return srs_success;
}
-srs_error_t SrsLazyGbSession::do_cycle()
+srs_error_t SrsGbSession::do_cycle()
{
srs_error_t err = srs_success;
while (true) {
- if ((err = trd_->pull()) != srs_success) {
+ if (!owner_coroutine_) return err;
+ if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
@@ -287,7 +281,7 @@ srs_error_t SrsLazyGbSession::do_cycle()
srs_usleep(SRS_GB_SESSION_DRIVE_INTERVAL);
// Client send bye, we should dispose the session.
- if (sip_->resource()->is_bye()) {
+ if (sip_->is_bye()) {
return err;
}
@@ -310,35 +304,33 @@ srs_error_t SrsLazyGbSession::do_cycle()
return err;
}
-srs_error_t SrsLazyGbSession::drive_state()
+srs_error_t SrsGbSession::drive_state()
{
srs_error_t err = srs_success;
#define SRS_GB_CHANGE_STATE_TO(state) { \
SrsGbSessionState ostate = set_state(state); \
- srs_trace("Session: Change device=%s, state=%s", sip_->resource()->device_id().c_str(), \
+ srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \
srs_gb_state(ostate, state_).c_str()); \
}
if (state_ == SrsGbSessionStateInit) {
// Set to connecting, whatever media is connected or not, because the connecting state will handle it if media
// is connected, so we don't need to handle it here.
- if (sip_->resource()->is_registered()) {
+ if (sip_->is_registered()) {
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting);
connecting_starttime_ = srs_update_system_time();
}
// Invite if media is not connected.
- if (sip_->resource()->is_registered() && !media_->resource()->is_connected()) {
+ if (sip_->is_registered() && !media_->is_connected()) {
uint32_t ssrc = 0;
- if ((err = sip_->resource()->invite_request(&ssrc)) != srs_success) {
+ if ((err = sip_->invite_request(&ssrc)) != srs_success) {
return srs_error_wrap(err, "invite");
}
// Now, we're able to query session by ssrc, for media packets.
- SrsLazyObjectWrapper* wrapper = wrapper_root_;
- srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine.
- _srs_gb_manager->add_with_fast_id(ssrc, wrapper);
+ _srs_gb_manager->add_with_fast_id(ssrc, wrapper_);
}
}
@@ -349,32 +341,32 @@ srs_error_t SrsLazyGbSession::drive_state()
}
srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(),
- srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected());
- sip_->resource()->reset_to_register();
+ srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
+ sip_->reset_to_register();
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
}
- if (sip_->resource()->is_stable() && media_->resource()->is_connected()) {
+ if (sip_->is_stable() && media_->is_connected()) {
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished);
}
}
if (state_ == SrsGbSessionStateEstablished) {
- if (sip_->resource()->is_bye()) {
+ if (sip_->is_bye()) {
srs_trace("Session: Dispose for client bye");
return err;
}
// When media disconnected, we wait for a while then reinvite.
- if (!media_->resource()->is_connected()) {
+ if (!media_->is_connected()) {
if (!reinviting_starttime_) {
reinviting_starttime_ = srs_update_system_time();
}
if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) {
reinviting_starttime_ = 0;
srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(),
- srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected());
- sip_->resource()->reset_to_register();
+ srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
+ sip_->reset_to_register();
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
}
}
@@ -383,19 +375,19 @@ srs_error_t SrsLazyGbSession::drive_state()
return err;
}
-SrsGbSessionState SrsLazyGbSession::set_state(SrsGbSessionState v)
+SrsGbSessionState SrsGbSession::set_state(SrsGbSessionState v)
{
SrsGbSessionState state = state_;
state_ = v;
return state;
}
-const SrsContextId& SrsLazyGbSession::get_id()
+const SrsContextId& SrsGbSession::get_id()
{
return cid_;
}
-std::string SrsLazyGbSession::desc()
+std::string SrsGbSession::desc()
{
return "GBS";
}
@@ -463,27 +455,33 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf
// Handle TCP connections.
if (listener == sip_listener_) {
- SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper();
- SrsLazyGbSipTcpConn* resource = dynamic_cast(conn->resource());
- resource->setup(conf_, sip_listener_, media_listener_, stfd);
+ SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn();
+ raw_conn->setup(conf_, sip_listener_, media_listener_, stfd);
- if ((err = resource->start()) != srs_success) {
- srs_freep(conn);
+ SrsSharedResource* conn = new SrsSharedResource(raw_conn);
+ _srs_gb_manager->add(conn, NULL);
+
+ SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
+ raw_conn->setup_owner(conn, executor, executor);
+
+ if ((err = executor->start()) != srs_success) {
+ srs_freep(executor);
return srs_error_wrap(err, "gb sip");
}
-
- _srs_gb_manager->add(conn, NULL);
} else if (listener == media_listener_) {
- SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper();
- SrsLazyGbMediaTcpConn* resource = dynamic_cast(conn->resource());
- resource->setup(stfd);
+ SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn();
+ raw_conn->setup(stfd);
- if ((err = resource->start()) != srs_success) {
- srs_freep(conn);
+ SrsSharedResource* conn = new SrsSharedResource(raw_conn);
+ _srs_gb_manager->add(conn, NULL);
+
+ SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
+ raw_conn->setup_owner(conn, executor, executor);
+
+ if ((err = executor->start()) != srs_success) {
+ srs_freep(executor);
return srs_error_wrap(err, "gb media");
}
-
- _srs_gb_manager->add(conn, NULL);
} else {
srs_warn("GB: Ignore TCP client");
srs_close_stfd(stfd);
@@ -492,9 +490,13 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf
return err;
}
-SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root)
+SrsGbSipTcpConn::SrsGbSipTcpConn()
{
- wrapper_root_ = wrapper_root;
+ wrapper_ = NULL;
+ owner_coroutine_ = NULL;
+ owner_cid_ = NULL;
+ cid_ = _srs_context->get_id();
+
session_ = NULL;
state_ = SrsGbSipStateInit;
register_ = new SrsSipMessage();
@@ -507,54 +509,62 @@ SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrappercopy();
- session_ = NULL;
sip_listener_ = sip;
media_listener_ = media;
conn_ = new SrsTcpConnection(stfd);
- receiver_ = new SrsLazyGbSipTcpReceiver(this, conn_);
- sender_ = new SrsLazyGbSipTcpSender(conn_);
+ receiver_ = new SrsGbSipTcpReceiver(this, conn_);
+ sender_ = new SrsGbSipTcpSender(conn_);
}
-std::string SrsLazyGbSipTcpConn::device_id()
+void SrsGbSipTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
+{
+ wrapper_ = wrapper;
+ owner_coroutine_ = owner_coroutine;
+ owner_cid_ = owner_cid;
+}
+
+void SrsGbSipTcpConn::on_executor_done(ISrsInterruptable* executor)
+{
+ owner_coroutine_ = NULL;
+}
+
+std::string SrsGbSipTcpConn::device_id()
{
return register_->device_id();
}
-void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid)
+void SrsGbSipTcpConn::set_cid(const SrsContextId& cid)
{
- trd_->set_cid(cid);
+ if (owner_cid_) owner_cid_->set_cid(cid);
receiver_->set_cid(cid);
sender_->set_cid(cid);
+ cid_ = cid;
}
-void SrsLazyGbSipTcpConn::query_ports(int* sip, int* media)
+void SrsGbSipTcpConn::query_ports(int* sip, int* media)
{
if (sip) *sip = sip_listener_->port();
if (media) *media = media_listener_->port();
}
-srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg)
+srs_error_t SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg)
{
srs_error_t err = srs_success;
@@ -603,7 +613,7 @@ srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg)
return err;
}
-void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg)
+void SrsGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg)
{
// Drive state machine when enqueue message.
drive_state(msg);
@@ -612,7 +622,7 @@ void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg)
sender_->enqueue(msg);
}
-void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg)
+void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg)
{
srs_error_t err = srs_success;
@@ -669,7 +679,7 @@ void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg)
}
}
-void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg)
+void SrsGbSipTcpConn::register_response(SrsSipMessage* msg)
{
SrsSipMessage* res = new SrsSipMessage();
@@ -686,7 +696,7 @@ void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg)
enqueue_sip_message(res);
}
-void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status)
+void SrsGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status)
{
SrsSipMessage* res = new SrsSipMessage();
@@ -701,9 +711,9 @@ void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status statu
enqueue_sip_message(res);
}
-void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg)
+void SrsGbSipTcpConn::invite_ack(SrsSipMessage* msg)
{
- string pip = session_->resource()->pip(); // Parse from CANDIDATE
+ string pip = session_->pip(); // Parse from CANDIDATE
int sip_port; query_ports(&sip_port, NULL);
string gb_device_id = srs_fmt("sip:%s@%s", msg->to_address_user_.c_str(), msg->to_address_host_.c_str());
string branch = srs_random_str(6);
@@ -722,7 +732,7 @@ void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg)
enqueue_sip_message(req);
}
-void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg)
+void SrsGbSipTcpConn::bye_response(SrsSipMessage* msg)
{
SrsSipMessage* res = new SrsSipMessage();
@@ -737,7 +747,7 @@ void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg)
enqueue_sip_message(res);
}
-srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc)
+srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc)
{
srs_error_t err = srs_success;
@@ -765,7 +775,7 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc)
if (pssrc) *pssrc = ssrc_v_;
}
- string pip = session_->resource()->pip(); // Parse from CANDIDATE
+ string pip = session_->pip(); // Parse from CANDIDATE
int sip_port, media_port; query_ports(&sip_port, &media_port);
string srs_device_id = srs_fmt("sip:%s@%s", register_->request_uri_user_.c_str(), register_->request_uri_host_.c_str());
string gb_device_id = srs_fmt("sip:%s@%s", register_->from_address_user_.c_str(), register_->from_address_host_.c_str());
@@ -834,63 +844,59 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc)
return err;
}
-void SrsLazyGbSipTcpConn::interrupt()
+void SrsGbSipTcpConn::interrupt()
{
receiver_->interrupt();
sender_->interrupt();
- trd_->interrupt();
+ if (owner_coroutine_) owner_coroutine_->interrupt();
}
-SrsGbSipState SrsLazyGbSipTcpConn::state()
+SrsGbSipState SrsGbSipTcpConn::state()
{
return state_;
}
-void SrsLazyGbSipTcpConn::reset_to_register()
+void SrsGbSipTcpConn::reset_to_register()
{
state_ = SrsGbSipStateRegistered;
}
-bool SrsLazyGbSipTcpConn::is_registered()
+bool SrsGbSipTcpConn::is_registered()
{
return state_ >= SrsGbSipStateRegistered && state_ <= SrsGbSipStateStable;
}
-bool SrsLazyGbSipTcpConn::is_stable()
+bool SrsGbSipTcpConn::is_stable()
{
return state_ == SrsGbSipStateStable;
}
-bool SrsLazyGbSipTcpConn::is_bye()
+bool SrsGbSipTcpConn::is_bye()
{
return state_ == SrsGbSipStateBye;
}
-SrsGbSipState SrsLazyGbSipTcpConn::set_state(SrsGbSipState v)
+SrsGbSipState SrsGbSipTcpConn::set_state(SrsGbSipState v)
{
SrsGbSipState state = state_;
state_ = v;
return state;
}
-const SrsContextId& SrsLazyGbSipTcpConn::get_id()
+const SrsContextId& SrsGbSipTcpConn::get_id()
{
- return trd_->cid();
+ return cid_;
}
-std::string SrsLazyGbSipTcpConn::desc()
+std::string SrsGbSipTcpConn::desc()
{
return "GB-SIP-TCP";
}
-srs_error_t SrsLazyGbSipTcpConn::start()
+srs_error_t SrsGbSipTcpConn::cycle()
{
srs_error_t err = srs_success;
- if ((err = trd_->start()) != srs_success) {
- return srs_error_wrap(err, "sip");
- }
-
if ((err = receiver_->start()) != srs_success) {
return srs_error_wrap(err, "receiver");
}
@@ -899,22 +905,13 @@ srs_error_t SrsLazyGbSipTcpConn::start()
return srs_error_wrap(err, "sender");
}
- return err;
-}
-
-srs_error_t SrsLazyGbSipTcpConn::cycle()
-{
- srs_error_t err = do_cycle();
+ // Wait for the SIP connection to be terminated.
+ err = do_cycle();
// Interrupt the receiver and sender coroutine.
receiver_->interrupt();
sender_->interrupt();
- // Note that we added wrapper to manager, so we must free the wrapper, not this connection.
- SrsLazyObjectWrapper* wrapper = wrapper_root_;
- srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it.
- _srs_gb_manager->remove(wrapper);
-
// success.
if (err == srs_success) {
srs_trace("client finished.");
@@ -942,23 +939,23 @@ srs_error_t SrsLazyGbSipTcpConn::cycle()
return srs_success;
}
-srs_error_t SrsLazyGbSipTcpConn::do_cycle()
+srs_error_t SrsGbSipTcpConn::do_cycle()
{
srs_error_t err = srs_success;
while (true) {
- if ((err = trd_->pull()) != srs_success) {
+ if (!owner_coroutine_) return err;
+ if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
- // TODO: Handle other messages.
srs_usleep(SRS_UTIME_NO_TIMEOUT);
}
return err;
}
-srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession)
+srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** psession)
{
srs_error_t err = srs_success;
@@ -968,32 +965,29 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW
// Only create session for REGISTER request.
if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err;
- // The lazy-sweep wrapper for this resource.
- SrsLazyObjectWrapper* wrapper = wrapper_root_;
- srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine of receiver.
-
// Find exists session for register, might be created by another object and still alive.
- SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device));
+ SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device));
+ SrsGbSession* raw_session = session ? (*session).get() : NULL;
if (!session) {
// Create new GB session.
- session = new SrsLazyObjectWrapper();
-
- if ((err = session->resource()->initialize(conf_)) != srs_success) {
- srs_freep(session);
- return srs_error_wrap(err, "initialize");
- }
-
- if ((err = session->resource()->start()) != srs_success) {
- srs_freep(session);
- return srs_error_wrap(err, "start");
- }
+ raw_session = new SrsGbSession();
+ raw_session->setup(conf_);
+ session = new SrsSharedResource(raw_session);
_srs_gb_manager->add_with_id(device, session);
+
+ SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session);
+ raw_session->setup_owner(session, executor, executor);
+
+ if ((err = executor->start()) != srs_success) {
+ srs_freep(executor);
+ return srs_error_wrap(err, "gb session");
+ }
}
// Try to load state from previous SIP connection.
- SrsLazyGbSipTcpConn* pre = dynamic_cast(session->resource()->sip_transport()->resource());
- if (pre) {
+ SrsSharedResource pre = raw_session->sip_transport();
+ if (pre.get() && pre.get() != this) {
state_ = pre->state_;
ssrc_str_ = pre->ssrc_str_;
ssrc_v_ = pre->ssrc_v_;
@@ -1001,36 +995,36 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW
srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy();
}
- // Notice SIP session to use current SIP connection.
- session->resource()->on_sip_transport(wrapper);
- *psession = session->copy();
+ // Notice session to use current SIP connection.
+ raw_session->on_sip_transport(*wrapper_);
+ *psession = raw_session;
return err;
}
-SrsLazyGbSipTcpReceiver::SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn)
+SrsGbSipTcpReceiver::SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn)
{
sip_ = sip;
conn_ = conn;
trd_ = new SrsSTCoroutine("sip-receiver", this);
}
-SrsLazyGbSipTcpReceiver::~SrsLazyGbSipTcpReceiver()
+SrsGbSipTcpReceiver::~SrsGbSipTcpReceiver()
{
srs_freep(trd_);
}
-void SrsLazyGbSipTcpReceiver::interrupt()
+void SrsGbSipTcpReceiver::interrupt()
{
trd_->interrupt();
}
-void SrsLazyGbSipTcpReceiver::set_cid(const SrsContextId& cid)
+void SrsGbSipTcpReceiver::set_cid(const SrsContextId& cid)
{
trd_->set_cid(cid);
}
-srs_error_t SrsLazyGbSipTcpReceiver::start()
+srs_error_t SrsGbSipTcpReceiver::start()
{
srs_error_t err = srs_success;
@@ -1041,7 +1035,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::start()
return err;
}
-srs_error_t SrsLazyGbSipTcpReceiver::cycle()
+srs_error_t SrsGbSipTcpReceiver::cycle()
{
srs_error_t err = do_cycle();
@@ -1053,7 +1047,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::cycle()
return err;
}
-srs_error_t SrsLazyGbSipTcpReceiver::do_cycle()
+srs_error_t SrsGbSipTcpReceiver::do_cycle()
{
srs_error_t err = srs_success;
@@ -1092,14 +1086,14 @@ srs_error_t SrsLazyGbSipTcpReceiver::do_cycle()
return err;
}
-SrsLazyGbSipTcpSender::SrsLazyGbSipTcpSender(SrsTcpConnection* conn)
+SrsGbSipTcpSender::SrsGbSipTcpSender(SrsTcpConnection* conn)
{
conn_ = conn;
wait_ = srs_cond_new();
trd_ = new SrsSTCoroutine("sip-sender", this);
}
-SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender()
+SrsGbSipTcpSender::~SrsGbSipTcpSender()
{
srs_freep(trd_);
srs_cond_destroy(wait_);
@@ -1110,23 +1104,23 @@ SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender()
}
}
-void SrsLazyGbSipTcpSender::enqueue(SrsSipMessage* msg)
+void SrsGbSipTcpSender::enqueue(SrsSipMessage* msg)
{
msgs_.push_back(msg);
srs_cond_signal(wait_);
}
-void SrsLazyGbSipTcpSender::interrupt()
+void SrsGbSipTcpSender::interrupt()
{
trd_->interrupt();
}
-void SrsLazyGbSipTcpSender::set_cid(const SrsContextId& cid)
+void SrsGbSipTcpSender::set_cid(const SrsContextId& cid)
{
trd_->set_cid(cid);
}
-srs_error_t SrsLazyGbSipTcpSender::start()
+srs_error_t SrsGbSipTcpSender::start()
{
srs_error_t err = srs_success;
@@ -1137,7 +1131,7 @@ srs_error_t SrsLazyGbSipTcpSender::start()
return err;
}
-srs_error_t SrsLazyGbSipTcpSender::cycle()
+srs_error_t SrsGbSipTcpSender::cycle()
{
srs_error_t err = do_cycle();
@@ -1149,7 +1143,7 @@ srs_error_t SrsLazyGbSipTcpSender::cycle()
return err;
}
-srs_error_t SrsLazyGbSipTcpSender::do_cycle()
+srs_error_t SrsGbSipTcpSender::do_cycle()
{
srs_error_t err = srs_success;
@@ -1219,71 +1213,74 @@ ISrsPsPackHandler::~ISrsPsPackHandler()
{
}
-SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root)
+SrsGbMediaTcpConn::SrsGbMediaTcpConn()
{
- wrapper_root_ = wrapper_root;
pack_ = new SrsPackContext(this);
- trd_ = new SrsSTCoroutine("media", this);
buffer_ = new uint8_t[65535];
conn_ = NULL;
+ wrapper_ = NULL;
+ owner_coroutine_ = NULL;
+ owner_cid_ = NULL;
+ cid_ = _srs_context->get_id();
+
session_ = NULL;
connected_ = false;
nn_rtcp_ = 0;
}
-SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn()
+SrsGbMediaTcpConn::~SrsGbMediaTcpConn()
{
- srs_freep(trd_);
srs_freep(conn_);
srs_freepa(buffer_);
srs_freep(pack_);
- srs_freep(session_);
}
-void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd)
+void SrsGbMediaTcpConn::setup(srs_netfd_t stfd)
{
srs_freep(conn_);
conn_ = new SrsTcpConnection(stfd);
}
-bool SrsLazyGbMediaTcpConn::is_connected()
+void SrsGbMediaTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
+{
+ wrapper_ = wrapper;
+ owner_coroutine_ = owner_coroutine;
+ owner_cid_ = owner_cid;
+}
+
+void SrsGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor)
+{
+ owner_coroutine_ = NULL;
+}
+
+bool SrsGbMediaTcpConn::is_connected()
{
return connected_;
}
-void SrsLazyGbMediaTcpConn::interrupt()
+void SrsGbMediaTcpConn::interrupt()
{
- trd_->interrupt();
+ if (owner_coroutine_) owner_coroutine_->interrupt();
}
-void SrsLazyGbMediaTcpConn::set_cid(const SrsContextId& cid)
+void SrsGbMediaTcpConn::set_cid(const SrsContextId& cid)
{
- trd_->set_cid(cid);
+ if (owner_cid_) owner_cid_->set_cid(cid);
+ cid_ = cid;
}
-const SrsContextId& SrsLazyGbMediaTcpConn::get_id()
+const SrsContextId& SrsGbMediaTcpConn::get_id()
{
- return _srs_context->get_id();
+ return cid_;
}
-std::string SrsLazyGbMediaTcpConn::desc()
+std::string SrsGbMediaTcpConn::desc()
{
return "GB-Media-TCP";
}
-srs_error_t SrsLazyGbMediaTcpConn::start()
-{
- srs_error_t err = srs_success;
-
- if ((err = trd_->start()) != srs_success) {
- return srs_error_wrap(err, "coroutine");
- }
-
- return err;
-}
-
-srs_error_t SrsLazyGbMediaTcpConn::cycle()
+srs_error_t SrsGbMediaTcpConn::cycle()
{
srs_error_t err = do_cycle();
@@ -1295,11 +1292,6 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle()
connected_ = false;
srs_trace("PS: Media disconnect, code=%d", srs_error_code(err));
- // Note that we added wrapper to manager, so we must free the wrapper, not this connection.
- SrsLazyObjectWrapper* wrapper = wrapper_root_;
- srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it.
- _srs_gb_manager->remove(wrapper);
-
// success.
if (err == srs_success) {
srs_trace("client finished.");
@@ -1327,7 +1319,7 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle()
return srs_success;
}
-srs_error_t SrsLazyGbMediaTcpConn::do_cycle()
+srs_error_t SrsGbMediaTcpConn::do_cycle()
{
srs_error_t err = srs_success;
@@ -1341,7 +1333,8 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle()
uint32_t reserved = 0;
for (;;) {
- if ((err = trd_->pull()) != srs_success) {
+ if (!owner_coroutine_) return err;
+ if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
@@ -1426,7 +1419,7 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle()
return err;
}
-srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs)
+srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs)
{
srs_error_t err = srs_success;
@@ -1437,7 +1430,7 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector
}
// Notify session about the media pack.
- session_->resource()->on_ps_pack(pack_, ps, msgs);
+ session_->on_ps_pack(pack_, ps, msgs);
//for (vector::const_iterator it = msgs.begin(); it != msgs.end(); ++it) {
// SrsTsMessage* msg = *it;
@@ -1450,23 +1443,22 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector
return err;
}
-srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession)
+srs_error_t SrsGbMediaTcpConn::bind_session(uint32_t ssrc, SrsGbSession** psession)
{
srs_error_t err = srs_success;
if (!ssrc) return err;
- // The lazy-sweep wrapper for this resource.
- SrsLazyObjectWrapper* wrapper = wrapper_root_;
- srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine.
-
// Find exists session for register, might be created by another object and still alive.
- SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc));
+ SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (!session) return err;
- _srs_gb_manager->add_with_fast_id(ssrc, session);
- session->resource()->on_media_transport(wrapper);
- *psession = session->copy();
+ SrsGbSession* raw_session = (*session).get();
+ srs_assert(raw_session);
+
+ // Notice session to use current media connection.
+ raw_session->on_media_transport(*wrapper_);
+ *psession = raw_session;
return err;
}
@@ -1545,7 +1537,7 @@ SrsSharedPtrMessage* SrsMpegpsQueue::dequeue()
return NULL;
}
-SrsGbMuxer::SrsGbMuxer(SrsLazyGbSession* session)
+SrsGbMuxer::SrsGbMuxer(SrsGbSession* session)
{
sdk_ = NULL;
session_ = session;
@@ -1580,13 +1572,9 @@ SrsGbMuxer::~SrsGbMuxer()
srs_freep(pprint_);
}
-srs_error_t SrsGbMuxer::initialize(std::string output)
+void SrsGbMuxer::setup(std::string output)
{
- srs_error_t err = srs_success;
-
output_ = output;
-
- return err;
}
srs_error_t SrsGbMuxer::on_ts_message(SrsTsMessage* msg)
@@ -2095,7 +2083,7 @@ srs_error_t SrsGbMuxer::connect()
// Cleanup the data before connect again.
close();
- string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id());
+ string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->device_id());
srs_trace("Muxer: Convert GB to RTMP %s", url.c_str());
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp
index 783842505..e44b618c6 100644
--- a/trunk/src/app/srs_app_gb28181.hpp
+++ b/trunk/src/app/srs_app_gb28181.hpp
@@ -26,11 +26,11 @@ class SrsCoroutine;
class SrsPackContext;
class SrsBuffer;
class SrsSipMessage;
-class SrsLazyGbSession;
-class SrsLazyGbSipTcpConn;
-class SrsLazyGbMediaTcpConn;
-class SrsLazyGbSipTcpReceiver;
-class SrsLazyGbSipTcpSender;
+class SrsGbSession;
+class SrsGbSipTcpConn;
+class SrsGbMediaTcpConn;
+class SrsGbSipTcpReceiver;
+class SrsGbSipTcpSender;
class SrsAlonePithyPrint;
class SrsGbMuxer;
class SrsSimpleRtmpClient;
@@ -51,7 +51,7 @@ class SrsRawAacStream;
// established:
// init: media is not connected.
// dispose session: sip is bye.
-// Please see SrsLazyGbSession::drive_state for detail.
+// Please see SrsGbSession::drive_state for detail.
enum SrsGbSessionState
{
SrsGbSessionStateInit = 0,
@@ -76,7 +76,7 @@ std::string srs_gb_session_state(SrsGbSessionState state);
// to bye: Got bye SIP message from device.
// re-inviting:
// to inviting: Got bye OK response from deivce.
-// Please see SrsLazyGbSipTcpConn::drive_state for detail.
+// Please see SrsGbSipTcpConn::drive_state for detail.
enum SrsGbSipState
{
SrsGbSipStateInit = 0,
@@ -90,16 +90,23 @@ enum SrsGbSipState
std::string srs_gb_sip_state(SrsGbSipState state);
// The main logic object for GB, the session.
-class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler
+// Each session contains a SIP object and a media object, that are managed by session. This means session always
+// lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word,
+// SIP and media objects use directly pointer to session, while session use shared ptr.
+class SrsGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler
{
private:
- SrsCoroutine* trd_;
SrsContextId cid_;
+private:
+ // The shared resource which own this object, we should never free it because it's managed by shared ptr.
+ SrsSharedResource* wrapper_;
+ // The owner coroutine, allow user to interrupt the loop.
+ ISrsInterruptable* owner_coroutine_;
+ ISrsContextIdSetter* owner_cid_;
private:
SrsGbSessionState state_;
- SrsLazyObjectWrapper* wrapper_root_;
- SrsLazyObjectWrapper* sip_;
- SrsLazyObjectWrapper* media_;
+ SrsSharedResource sip_;
+ SrsSharedResource media_;
SrsGbMuxer* muxer_;
private:
// The candidate for SDP in configuration.
@@ -132,26 +139,27 @@ private:
uint64_t media_recovered_;
uint64_t media_msgs_dropped_;
uint64_t media_reserved_;
-private:
- friend class SrsLazyObjectWrapper;
- SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root);
public:
- virtual ~SrsLazyGbSession();
+ SrsGbSession();
+ virtual ~SrsGbSession();
public:
// Initialize the GB session.
- srs_error_t initialize(SrsConfDirective* conf);
+ void setup(SrsConfDirective* conf);
+ // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
+ void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
+// Interface ISrsExecutorHandler
+public:
+ virtual void on_executor_done(ISrsInterruptable* executor);
+public:
// When got a pack of messages.
void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs);
// When got available SIP transport.
- void on_sip_transport(SrsLazyObjectWrapper* sip);
- SrsLazyObjectWrapper* sip_transport();
+ void on_sip_transport(SrsSharedResource sip);
+ SrsSharedResource sip_transport();
// When got available media transport.
- void on_media_transport(SrsLazyObjectWrapper* media);
+ void on_media_transport(SrsSharedResource media);
// Get the candidate for SDP generation, the public IP address for device to connect to.
std::string pip();
-// Interface ISrsStartable
-public:
- virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler
public:
virtual srs_error_t cycle();
@@ -186,12 +194,12 @@ public:
};
// A GB28181 TCP SIP connection.
-class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler
+class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler
{
private:
SrsGbSipState state_;
- SrsLazyObjectWrapper* wrapper_root_;
- SrsLazyObjectWrapper* session_;
+ // The owner session object, note that we use the raw pointer and should never free it.
+ SrsGbSession* session_;
SrsSipMessage* register_;
SrsSipMessage* invite_ok_;
private:
@@ -202,18 +210,28 @@ private:
SrsTcpListener* sip_listener_;
SrsTcpListener* media_listener_;
private:
- SrsTcpConnection* conn_;
- SrsLazyGbSipTcpReceiver* receiver_;
- SrsLazyGbSipTcpSender* sender_;
- SrsCoroutine* trd_;
+ // The shared resource which own this object, we should never free it because it's managed by shared ptr.
+ SrsSharedResource* wrapper_;
+ // The owner coroutine, allow user to interrupt the loop.
+ ISrsInterruptable* owner_coroutine_;
+ ISrsContextIdSetter* owner_cid_;
+ SrsContextId cid_;
private:
- friend class SrsLazyObjectWrapper;
- SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root);
+ SrsTcpConnection* conn_;
+ SrsGbSipTcpReceiver* receiver_;
+ SrsGbSipTcpSender* sender_;
public:
- virtual ~SrsLazyGbSipTcpConn();
+ SrsGbSipTcpConn();
+ virtual ~SrsGbSipTcpConn();
public:
// Setup object, to keep empty constructor.
void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd);
+ // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
+ void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
+// Interface ISrsExecutorHandler
+public:
+ virtual void on_executor_done(ISrsInterruptable* executor);
+public:
// Get the SIP device id.
std::string device_id();
// Set the cid of all coroutines.
@@ -253,29 +271,26 @@ private:
public:
virtual const SrsContextId& get_id();
virtual std::string desc();
-// Interface ISrsStartable
-public:
- virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler
public:
virtual srs_error_t cycle();
private:
- virtual srs_error_t do_cycle();
+ srs_error_t do_cycle();
private:
// Create session if no one, or bind to an existed session.
- srs_error_t bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession);
+ srs_error_t bind_session(SrsSipMessage* msg, SrsGbSession** psession);
};
// Start a coroutine to receive SIP messages.
-class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler
+class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler
{
private:
SrsCoroutine* trd_;
SrsTcpConnection* conn_;
- SrsLazyGbSipTcpConn* sip_;
+ SrsGbSipTcpConn* sip_;
public:
- SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn);
- virtual ~SrsLazyGbSipTcpReceiver();
+ SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn);
+ virtual ~SrsGbSipTcpReceiver();
public:
// Interrupt the receiver coroutine.
void interrupt();
@@ -292,7 +307,7 @@ private:
};
// Start a coroutine to send out SIP messages.
-class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler
+class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler
{
private:
SrsCoroutine* trd_;
@@ -301,8 +316,8 @@ private:
std::vector msgs_;
srs_cond_t wait_;
public:
- SrsLazyGbSipTcpSender(SrsTcpConnection* conn);
- virtual ~SrsLazyGbSipTcpSender();
+ SrsGbSipTcpSender(SrsTcpConnection* conn);
+ virtual ~SrsGbSipTcpSender();
public:
// Push message to queue, and sender will send out in dedicate coroutine.
void enqueue(SrsSipMessage* msg);
@@ -333,27 +348,36 @@ public:
};
// A GB28181 TCP media connection, for PS stream.
-class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler
- , public ISrsPsPackHandler
+class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler
{
private:
bool connected_;
- SrsLazyObjectWrapper* wrapper_root_;
- SrsLazyObjectWrapper* session_;
+ // The owner session object, note that we use the raw pointer and should never free it.
+ SrsGbSession* session_;
uint32_t nn_rtcp_;
+private:
+ // The shared resource which own this object, we should never free it because it's managed by shared ptr.
+ SrsSharedResource* wrapper_;
+ // The owner coroutine, allow user to interrupt the loop.
+ ISrsInterruptable* owner_coroutine_;
+ ISrsContextIdSetter* owner_cid_;
+ SrsContextId cid_;
private:
SrsPackContext* pack_;
SrsTcpConnection* conn_;
- SrsCoroutine* trd_;
uint8_t* buffer_;
-private:
- friend class SrsLazyObjectWrapper;
- SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root);
public:
- virtual ~SrsLazyGbMediaTcpConn();
+ SrsGbMediaTcpConn();
+ virtual ~SrsGbMediaTcpConn();
public:
// Setup object, to keep empty constructor.
void setup(srs_netfd_t stfd);
+ // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
+ void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
+// Interface ISrsExecutorHandler
+public:
+ virtual void on_executor_done(ISrsInterruptable* executor);
+public:
// Whether media is connected.
bool is_connected();
// Interrupt transport by session.
@@ -364,9 +388,6 @@ public:
public:
virtual const SrsContextId& get_id();
virtual std::string desc();
-// Interface ISrsStartable
-public:
- virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler
public:
virtual srs_error_t cycle();
@@ -377,7 +398,7 @@ public:
virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector& msgs);
private:
// Create session if no one, or bind to an existed session.
- srs_error_t bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession);
+ srs_error_t bind_session(uint32_t ssrc, SrsGbSession** psession);
};
// The queue for mpegts over udp to send packets.
@@ -402,7 +423,8 @@ public:
class SrsGbMuxer
{
private:
- SrsLazyGbSession* session_;
+ // The owner session object, note that we use the raw pointer and should never free it.
+ SrsGbSession* session_;
std::string output_;
SrsSimpleRtmpClient* sdk_;
private:
@@ -428,10 +450,10 @@ private:
SrsMpegpsQueue* queue_;
SrsPithyPrint* pprint_;
public:
- SrsGbMuxer(SrsLazyGbSession* session);
+ SrsGbMuxer(SrsGbSession* session);
virtual ~SrsGbMuxer();
public:
- srs_error_t initialize(std::string output);
+ void setup(std::string output);
srs_error_t on_ts_message(SrsTsMessage* msg);
private:
virtual srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs);
diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp
index d213f3f09..c5f650075 100644
--- a/trunk/src/app/srs_app_server.cpp
+++ b/trunk/src/app/srs_app_server.cpp
@@ -1369,11 +1369,6 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg)
}
#endif
- SrsLazySweepGc* gc = dynamic_cast(_srs_gc);
- if ((err = gc->start()) != srs_success) {
- return srs_error_wrap(err, "start gc");
- }
-
return err;
}
diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp
index 77f581a64..3e21e468c 100755
--- a/trunk/src/app/srs_app_st.cpp
+++ b/trunk/src/app/srs_app_st.cpp
@@ -30,6 +30,30 @@ ISrsStartable::~ISrsStartable()
{
}
+ISrsInterruptable::ISrsInterruptable()
+{
+}
+
+ISrsInterruptable::~ISrsInterruptable()
+{
+}
+
+ISrsContextIdSetter::ISrsContextIdSetter()
+{
+}
+
+ISrsContextIdSetter::~ISrsContextIdSetter()
+{
+}
+
+ISrsContextIdGetter::ISrsContextIdGetter()
+{
+}
+
+ISrsContextIdGetter::~ISrsContextIdGetter()
+{
+}
+
SrsCoroutine::SrsCoroutine()
{
}
@@ -342,3 +366,69 @@ void SrsWaitGroup::wait()
}
}
+ISrsExecutorHandler::ISrsExecutorHandler()
+{
+}
+
+ISrsExecutorHandler::~ISrsExecutorHandler()
+{
+}
+
+SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb)
+{
+ resource_ = r;
+ handler_ = h;
+ manager_ = m;
+ callback_ = cb;
+ trd_ = new SrsSTCoroutine("ar", this, resource_->get_id());
+}
+
+SrsExecutorCoroutine::~SrsExecutorCoroutine()
+{
+ manager_->remove(resource_);
+ srs_freep(trd_);
+}
+
+srs_error_t SrsExecutorCoroutine::start()
+{
+ return trd_->start();
+}
+
+void SrsExecutorCoroutine::interrupt()
+{
+ trd_->interrupt();
+}
+
+srs_error_t SrsExecutorCoroutine::pull()
+{
+ return trd_->pull();
+}
+
+const SrsContextId& SrsExecutorCoroutine::cid()
+{
+ return trd_->cid();
+}
+
+void SrsExecutorCoroutine::set_cid(const SrsContextId& cid)
+{
+ trd_->set_cid(cid);
+}
+
+srs_error_t SrsExecutorCoroutine::cycle()
+{
+ srs_error_t err = handler_->cycle();
+ if (callback_) callback_->on_executor_done(this);
+ manager_->remove(this);
+ return err;
+}
+
+const SrsContextId& SrsExecutorCoroutine::get_id()
+{
+ return resource_->get_id();
+}
+
+std::string SrsExecutorCoroutine::desc()
+{
+ return resource_->desc();
+}
+
diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp
index 51282bca2..d7315b20c 100644
--- a/trunk/src/app/srs_app_st.hpp
+++ b/trunk/src/app/srs_app_st.hpp
@@ -15,8 +15,10 @@
#include
#include
#include
+#include
class SrsFastCoroutine;
+class SrsExecutorCoroutine;
// Each ST-coroutine must implements this interface,
// to do the cycle job and handle some events.
@@ -64,21 +66,46 @@ public:
virtual srs_error_t start() = 0;
};
-// The corotine object.
-class SrsCoroutine : public ISrsStartable
+// Allow user to interrupt the coroutine, for example, to stop it.
+class ISrsInterruptable
+{
+public:
+ ISrsInterruptable();
+ virtual ~ISrsInterruptable();
+public:
+ virtual void interrupt() = 0;
+ virtual srs_error_t pull() = 0;
+};
+
+// Get the context id.
+class ISrsContextIdSetter
+{
+public:
+ ISrsContextIdSetter();
+ virtual ~ISrsContextIdSetter();
+public:
+ virtual void set_cid(const SrsContextId& cid) = 0;
+};
+
+// Set the context id.
+class ISrsContextIdGetter
+{
+public:
+ ISrsContextIdGetter();
+ virtual ~ISrsContextIdGetter();
+public:
+ virtual const SrsContextId& cid() = 0;
+};
+
+// The coroutine object.
+class SrsCoroutine : public ISrsStartable, public ISrsInterruptable
+ , public ISrsContextIdSetter, public ISrsContextIdGetter
{
public:
SrsCoroutine();
virtual ~SrsCoroutine();
public:
virtual void stop() = 0;
- virtual void interrupt() = 0;
- // @return a copy of error, which should be freed by user.
- // NULL if not terminated and user should pull again.
- virtual srs_error_t pull() = 0;
- // Get and set the context id of coroutine.
- virtual const SrsContextId& cid() = 0;
- virtual void set_cid(const SrsContextId& cid) = 0;
};
// An empty coroutine, user can default to this object before create any real coroutine.
@@ -192,7 +219,7 @@ private:
static void* pfn(void* arg);
};
-// Like goroytine sync.WaitGroup.
+// Like goroutine sync.WaitGroup.
class SrsWaitGroup
{
private:
@@ -206,9 +233,72 @@ public:
void add(int n);
// When coroutine is done.
void done();
- // Wait for all corotine to be done.
+ // Wait for all coroutine to be done.
void wait();
};
+// The callback when executor cycle done.
+class ISrsExecutorHandler
+{
+public:
+ ISrsExecutorHandler();
+ virtual ~ISrsExecutorHandler();
+public:
+ virtual void on_executor_done(ISrsInterruptable* executor) = 0;
+};
+
+// Start a coroutine for resource executor, to execute the handler and delete resource and itself when
+// handler cycle done.
+//
+// Note that the executor will free itself by manager, then free the resource by manager. This is a helper
+// that is used for a goroutine to execute a handler and free itself after the cycle is done.
+//
+// Generally, the handler, resource, and callback generally are the same object. But we do not define a single
+// interface, because shared resource is a special interface.
+//
+// Note that the resource may live longer than executor, because it is shared resource, so you should process
+// the callback. For example, you should never use the executor after it's stopped and deleted.
+//
+// Usage:
+// ISrsResourceManager* manager = ...;
+// ISrsResource* resource, ISrsCoroutineHandler* handler, ISrsExecutorHandler* callback = ...; // Resource, handler, and callback are the same object.
+// SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(manager, resource, handler);
+// if ((err = executor->start()) != srs_success) {
+// srs_freep(executor);
+// return err;
+// }
+class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsInterruptable
+ , public ISrsContextIdSetter, public ISrsContextIdGetter, public ISrsCoroutineHandler
+{
+private:
+ ISrsResourceManager* manager_;
+ ISrsResource* resource_;
+ ISrsCoroutineHandler* handler_;
+ ISrsExecutorHandler* callback_;
+private:
+ SrsCoroutine* trd_;
+public:
+ SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb);
+ virtual ~SrsExecutorCoroutine();
+// Interface ISrsStartable
+public:
+ virtual srs_error_t start();
+// Interface ISrsInterruptable
+public:
+ virtual void interrupt();
+ virtual srs_error_t pull();
+// Interface ISrsContextId
+public:
+ virtual const SrsContextId& cid();
+ virtual void set_cid(const SrsContextId& cid);
+// Interface ISrsOneCycleThreadHandler
+public:
+ virtual srs_error_t cycle();
+// Interface ISrsResource
+public:
+ virtual const SrsContextId& get_id();
+ virtual std::string desc();
+};
+
#endif
diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp
index b42a163be..d5c6e4229 100644
--- a/trunk/src/app/srs_app_threads.cpp
+++ b/trunk/src/app/srs_app_threads.cpp
@@ -335,7 +335,6 @@ srs_error_t srs_global_initialize()
#ifdef SRS_GB28181
_srs_gb_manager = new SrsResourceManager("GB", true);
#endif
- _srs_gc = new SrsLazySweepGc();
// Initialize global pps, which depends on _srs_clock
_srs_pps_ids = new SrsPps();
diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp
index e8f281519..28f766276 100644
--- a/trunk/src/core/srs_core_autofree.hpp
+++ b/trunk/src/core/srs_core_autofree.hpp
@@ -81,4 +81,103 @@ public:
}
};
+// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107
+// Usage:
+// SrsSharedPtr ptr(new MyClass());
+// ptr->do_something();
+//
+// SrsSharedPtr cp = ptr;
+// cp->do_something();
+template
+class SrsSharedPtr
+{
+private:
+ // The pointer to the object.
+ T* ptr_;
+ // The reference count of the object.
+ uint32_t* ref_count_;
+public:
+ // Create a shared ptr with the object.
+ SrsSharedPtr(T* ptr) {
+ ptr_ = ptr;
+ ref_count_ = new uint32_t(1);
+ }
+ // Copy the shared ptr.
+ SrsSharedPtr(const SrsSharedPtr& cp) {
+ copy(cp);
+ }
+ // Dispose and delete the shared ptr.
+ virtual ~SrsSharedPtr() {
+ reset();
+ }
+private:
+ // Reset the shared ptr.
+ void reset() {
+ if (!ref_count_) return;
+
+ (*ref_count_)--;
+ if (*ref_count_ == 0) {
+ delete ptr_;
+ delete ref_count_;
+ }
+
+ ptr_ = NULL;
+ ref_count_ = NULL;
+ }
+ // Copy from other shared ptr.
+ void copy(const SrsSharedPtr& cp) {
+ ptr_ = cp.ptr_;
+ ref_count_ = cp.ref_count_;
+ if (ref_count_) (*ref_count_)++;
+ }
+ // Move from other shared ptr.
+ void move(SrsSharedPtr& cp) {
+ ptr_ = cp.ptr_;
+ ref_count_ = cp.ref_count_;
+ cp.ptr_ = NULL;
+ cp.ref_count_ = NULL;
+ }
+public:
+ // Get the object.
+ T* get() {
+ return ptr_;
+ }
+ // Overload the -> operator.
+ T* operator->() {
+ return ptr_;
+ }
+ // The assign operator.
+ SrsSharedPtr& operator=(const SrsSharedPtr& cp) {
+ if (this != &cp) {
+ reset();
+ copy(cp);
+ }
+ return *this;
+ }
+private:
+ // Overload the * operator.
+ T& operator*() {
+ return *ptr_;
+ }
+ // Overload the bool operator.
+ operator bool() const {
+ return ptr_ != NULL;
+ }
+#if __cplusplus >= 201103L // C++11
+public:
+ // The move constructor.
+ SrsSharedPtr(SrsSharedPtr&& cp) {
+ move(cp);
+ };
+ // The move assign operator.
+ SrsSharedPtr& operator=(SrsSharedPtr&& cp) {
+ if (this != &cp) {
+ reset();
+ move(cp);
+ }
+ return *this;
+ };
+#endif
+};
+
#endif
diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp
index d12f389b6..e8bac6484 100644
--- a/trunk/src/core/srs_core_version6.hpp
+++ b/trunk/src/core/srs_core_version6.hpp
@@ -9,6 +9,6 @@
#define VERSION_MAJOR 6
#define VERSION_MINOR 0
-#define VERSION_REVISION 125
+#define VERSION_REVISION 126
#endif
diff --git a/trunk/src/protocol/srs_protocol_conn.cpp b/trunk/src/protocol/srs_protocol_conn.cpp
index 0c988b9b6..b0fba2dab 100644
--- a/trunk/src/protocol/srs_protocol_conn.cpp
+++ b/trunk/src/protocol/srs_protocol_conn.cpp
@@ -40,35 +40,3 @@ ISrsConnection::~ISrsConnection()
{
}
-SrsLazyObject::SrsLazyObject()
-{
- gc_ref_ = 0;
-}
-
-SrsLazyObject::~SrsLazyObject()
-{
-}
-
-void SrsLazyObject::gc_use()
-{
- gc_ref_++;
-}
-
-void SrsLazyObject::gc_dispose()
-{
- gc_ref_--;
-}
-
-int32_t SrsLazyObject::gc_ref()
-{
- return gc_ref_;
-}
-
-ISrsLazyGc::ISrsLazyGc()
-{
-}
-
-ISrsLazyGc::~ISrsLazyGc()
-{
-}
-
diff --git a/trunk/src/protocol/srs_protocol_conn.hpp b/trunk/src/protocol/srs_protocol_conn.hpp
index b136716ad..ee17aed56 100644
--- a/trunk/src/protocol/srs_protocol_conn.hpp
+++ b/trunk/src/protocol/srs_protocol_conn.hpp
@@ -33,7 +33,9 @@ public:
ISrsResourceManager();
virtual ~ISrsResourceManager();
public:
- // Remove then free the specified connection.
+ // Remove then free the specified connection. Note that the manager always free c resource,
+ // in the same coroutine or another coroutine. Some manager may support add c to a map, it
+ // should always free it even if it's in the map.
virtual void remove(ISrsResource* c) = 0;
};
@@ -48,36 +50,5 @@ public:
virtual std::string remote_ip() = 0;
};
-// Lazy-sweep resource, never sweep util all wrappers are freed.
-// See https://github.com/ossrs/srs/issues/3176#lazy-sweep
-class SrsLazyObject
-{
-private:
- // The reference count of resource, 0 is no wrapper and safe to sweep.
- int32_t gc_ref_;
-public:
- SrsLazyObject();
- virtual ~SrsLazyObject();
-public:
- // For wrapper to use this resource.
- virtual void gc_use();
- // For wrapper to dispose this resource.
- virtual void gc_dispose();
- // The current reference count of resource.
- virtual int32_t gc_ref();
-};
-
-// The lazy-sweep GC, wait for a long time to dispose resource even when resource is disposable.
-// See https://github.com/ossrs/srs/issues/3176#lazy-sweep
-class ISrsLazyGc
-{
-public:
- ISrsLazyGc();
- virtual ~ISrsLazyGc();
-public:
- // Remove then free the specified resource.
- virtual void remove(SrsLazyObject* c) = 0;
-};
-
#endif
diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp
index 1b399fd77..2344e911e 100644
--- a/trunk/src/utest/srs_utest_core.cpp
+++ b/trunk/src/utest/srs_utest_core.cpp
@@ -8,6 +8,8 @@
using namespace std;
#include
+#include
+#include
VOID TEST(CoreAutoFreeTest, Free)
{
@@ -86,3 +88,343 @@ VOID TEST(CoreLogger, CheckVsnprintf)
}
}
+VOID TEST(CoreLogger, SharedPtrTypical)
+{
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ EXPECT_TRUE(p);
+ EXPECT_EQ(100, *p);
+ }
+
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q = p;
+ EXPECT_EQ(p.get(), q.get());
+ }
+
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q(p);
+ EXPECT_EQ(p.get(), q.get());
+ }
+
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q = p;
+ EXPECT_TRUE(p);
+ EXPECT_TRUE(q);
+ EXPECT_EQ(100, *p);
+ EXPECT_EQ(100, *q);
+ }
+}
+
+VOID TEST(CoreLogger, SharedPtrReset)
+{
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q = p;
+ p.reset();
+ EXPECT_FALSE(p);
+ EXPECT_TRUE(q);
+ EXPECT_EQ(100, *q);
+ }
+
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q = p;
+ q.reset();
+ EXPECT_TRUE(p);
+ EXPECT_FALSE(q);
+ EXPECT_EQ(100, *p);
+ }
+}
+
+VOID TEST(CoreLogger, SharedPtrObject)
+{
+ SrsSharedPtr p(new MyNormalObject(100));
+ EXPECT_TRUE(p);
+ EXPECT_EQ(100, p->id());
+}
+
+VOID TEST(CoreLogger, SharedPtrNullptr)
+{
+ SrsSharedPtr p(NULL);
+ EXPECT_FALSE(p);
+
+ p.reset();
+ EXPECT_FALSE(p);
+
+ SrsSharedPtr q = p;
+ EXPECT_FALSE(q);
+}
+
+class MockWrapper
+{
+public:
+ int* ptr;
+public:
+ MockWrapper(int* p) {
+ ptr = p;
+ *ptr = *ptr + 1;
+ }
+ ~MockWrapper() {
+ *ptr = *ptr - 1;
+ }
+};
+
+VOID TEST(CoreLogger, SharedPtrWrapper)
+{
+ int* ptr = new int(100);
+ SrsAutoFree(int, ptr);
+ EXPECT_EQ(100, *ptr);
+
+ if (true) {
+ SrsSharedPtr p(new MockWrapper(ptr));
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *p->ptr);
+
+ SrsSharedPtr q = p;
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *p->ptr);
+ EXPECT_EQ(101, *q->ptr);
+
+ SrsSharedPtr r(new MockWrapper(ptr));
+ EXPECT_EQ(102, *ptr);
+ EXPECT_EQ(102, *p->ptr);
+ EXPECT_EQ(102, *q->ptr);
+ EXPECT_EQ(102, *r->ptr);
+
+ SrsSharedPtr s(new MockWrapper(ptr));
+ EXPECT_EQ(103, *ptr);
+ EXPECT_EQ(103, *p->ptr);
+ EXPECT_EQ(103, *q->ptr);
+ EXPECT_EQ(103, *r->ptr);
+ EXPECT_EQ(103, *s->ptr);
+ }
+ EXPECT_EQ(100, *ptr);
+
+ if (true) {
+ SrsSharedPtr p(new MockWrapper(ptr));
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *p->ptr);
+ }
+ EXPECT_EQ(100, *ptr);
+}
+
+VOID TEST(CoreLogger, SharedPtrAssign)
+{
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q(NULL);
+ q = p;
+ EXPECT_EQ(p.get(), q.get());
+ }
+
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q(new int(101));
+
+ int* q0 = q.get();
+ q = p;
+ EXPECT_EQ(p.get(), q.get());
+ EXPECT_NE(q0, q.get());
+ }
+
+ int* ptr0 = new int(100);
+ SrsAutoFree(int, ptr0);
+ EXPECT_EQ(100, *ptr0);
+
+ int* ptr1 = new int(200);
+ SrsAutoFree(int, ptr1);
+ EXPECT_EQ(200, *ptr1);
+
+ if (true) {
+ SrsSharedPtr p(new MockWrapper(ptr0));
+ EXPECT_EQ(101, *ptr0);
+ EXPECT_EQ(101, *p->ptr);
+
+ SrsSharedPtr q(new MockWrapper(ptr1));
+ EXPECT_EQ(201, *ptr1);
+ EXPECT_EQ(201, *q->ptr);
+
+ q = p;
+ EXPECT_EQ(200, *ptr1);
+ EXPECT_EQ(101, *ptr0);
+ EXPECT_EQ(101, *p->ptr);
+ EXPECT_EQ(101, *q->ptr);
+ }
+
+ EXPECT_EQ(100, *ptr0);
+ EXPECT_EQ(200, *ptr1);
+}
+
+template
+SrsSharedPtr mock_shared_ptr_move_assign(SrsSharedPtr p) {
+ SrsSharedPtr q = p;
+ return q;
+}
+
+template
+SrsSharedPtr mock_shared_ptr_move_ctr(SrsSharedPtr p) {
+ return p;
+}
+
+VOID TEST(CoreLogger, SharedPtrMove)
+{
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q(new int(101));
+ q = mock_shared_ptr_move_ctr(p);
+ EXPECT_EQ(q.get(), p.get());
+ }
+
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q(new int(101));
+ q = mock_shared_ptr_move_assign(p);
+ EXPECT_EQ(q.get(), p.get());
+ }
+
+ int* ptr = new int(100);
+ SrsAutoFree(int, ptr);
+ EXPECT_EQ(100, *ptr);
+
+ if (true) {
+ SrsSharedPtr p(new MockWrapper(ptr));
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *p->ptr);
+
+ SrsSharedPtr q(new MockWrapper(ptr));
+ q = mock_shared_ptr_move_ctr(p);
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *q->ptr);
+ }
+ EXPECT_EQ(100, *ptr);
+
+ if (true) {
+ SrsSharedPtr p(new MockWrapper(ptr));
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *p->ptr);
+
+ SrsSharedPtr q(new MockWrapper(ptr));
+ q = mock_shared_ptr_move_assign(p);
+ EXPECT_EQ(101, *ptr);
+ EXPECT_EQ(101, *q->ptr);
+ }
+ EXPECT_EQ(100, *ptr);
+
+ // Note that this will not trigger the move constructor or move assignment operator.
+ if (true) {
+ SrsSharedPtr p(new int(100));
+ SrsSharedPtr q = mock_shared_ptr_move_assign(p);
+ EXPECT_EQ(q.get(), p.get());
+ }
+
+ // Note that this will not trigger the move constructor or move assignment operator.
+ if (true) {
+ SrsSharedPtr p = SrsSharedPtr(new int(100));
+ EXPECT_TRUE(p);
+ EXPECT_EQ(100, *p);
+ }
+}
+
+class MockIntResource : public ISrsResource
+{
+public:
+ SrsContextId id_;
+ int value_;
+public:
+ MockIntResource(int value) : value_(value) {
+ }
+ virtual ~MockIntResource() {
+ }
+public:
+ virtual const SrsContextId& get_id() {
+ return id_;
+ }
+ virtual std::string desc() {
+ return id_.c_str();
+ }
+};
+
+VOID TEST(CoreLogger, SharedResourceTypical)
+{
+ if (true) {
+ SrsSharedResource* p = new SrsSharedResource(new MockIntResource(100));
+ EXPECT_TRUE(*p);
+ EXPECT_EQ(100, (*p)->value_);
+ srs_freep(p);
+ }
+
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ EXPECT_TRUE(p);
+ EXPECT_EQ(100, p->value_);
+ }
+
+ if (true) {
+ SrsSharedResource p = SrsSharedResource(new MockIntResource(100));
+ EXPECT_TRUE(p);
+ EXPECT_EQ(100, p->value_);
+ }
+
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ SrsSharedResource q = p;
+ EXPECT_EQ(p.get(), q.get());
+ }
+
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ SrsSharedResource q(NULL);
+ q = p;
+ EXPECT_EQ(p.get(), q.get());
+ }
+
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ SrsSharedResource q(new MockIntResource(200));
+ q = p;
+ EXPECT_EQ(p.get(), q.get());
+ }
+
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ SrsSharedResource q = p;
+ EXPECT_TRUE(p);
+ EXPECT_TRUE(q);
+ EXPECT_EQ(100, p->value_);
+ EXPECT_EQ(100, q->value_);
+ }
+}
+
+template
+SrsSharedResource mock_shared_resource_move_assign(SrsSharedResource p) {
+ SrsSharedResource q = p;
+ return q;
+}
+
+template
+SrsSharedResource mock_shared_resource_move_ctr(SrsSharedResource p) {
+ return p;
+}
+
+VOID TEST(CoreLogger, SharedResourceMove)
+{
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ SrsSharedResource q(new MockIntResource(101));
+ q = mock_shared_resource_move_ctr(p);
+ EXPECT_EQ(100, q->value_);
+ EXPECT_EQ(q.get(), p.get());
+ }
+
+ if (true) {
+ SrsSharedResource p(new MockIntResource(100));
+ SrsSharedResource q(new MockIntResource(101));
+ q = mock_shared_resource_move_assign(p);
+ EXPECT_EQ(100, q->value_);
+ EXPECT_EQ(q.get(), p.get());
+ }
+}
+
diff --git a/trunk/src/utest/srs_utest_core.hpp b/trunk/src/utest/srs_utest_core.hpp
index 1c7795b97..8c7306384 100644
--- a/trunk/src/utest/srs_utest_core.hpp
+++ b/trunk/src/utest/srs_utest_core.hpp
@@ -14,5 +14,18 @@
#include
+class MyNormalObject
+{
+private:
+ int id_;
+public:
+ MyNormalObject(int id) {
+ id_ = id;
+ }
+ int id() {
+ return id_;
+ }
+};
+
#endif