1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080)

The object relations: 

![gb](266e8a4e-3f1e-4805-8406-9008d6a63aa0)

Session manages SIP and Media object using shared resource or shared
ptr. Note that I actually use SrsExecutorCoroutine to delete the object
when each coroutine is done, because there is always a dedicate
coroutine for each object.

For SIP and Media object, they directly use the session by raw pointer,
it's safe because session always live longer than session and media
object.

---

Co-authored-by: Jacob Su <suzp1984@gmail.com>
This commit is contained in:
Winlin 2024-06-12 22:40:20 +08:00 committed by GitHub
parent 1656391c67
commit 6834ec208d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 989 additions and 464 deletions

View file

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v6-changes"></a> <a name="v6-changes"></a>
## SRS 6.0 Changelog ## SRS 6.0 Changelog
* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080)
* v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057) * v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057)
* v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044) * v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044)
* v6.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v6.0.123 (#4038) * v6.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v6.0.123 (#4038)

View file

@ -413,28 +413,6 @@ void SrsResourceManager::dispose(ISrsResource* c)
} }
} }
SrsLazySweepGc::SrsLazySweepGc()
{
}
SrsLazySweepGc::~SrsLazySweepGc()
{
}
srs_error_t SrsLazySweepGc::start()
{
srs_error_t err = srs_success;
return err;
}
void SrsLazySweepGc::remove(SrsLazyObject* c)
{
// TODO: FIXME: MUST lazy sweep.
srs_freep(c);
}
ISrsLazyGc* _srs_gc = NULL;
ISrsExpire::ISrsExpire() ISrsExpire::ISrsExpire()
{ {
} }

View file

@ -20,6 +20,7 @@
#include <srs_protocol_kbps.hpp> #include <srs_protocol_kbps.hpp>
#include <srs_app_reload.hpp> #include <srs_app_reload.hpp>
#include <srs_protocol_conn.hpp> #include <srs_protocol_conn.hpp>
#include <srs_core_autofree.hpp>
class SrsWallClock; class SrsWallClock;
class SrsBuffer; class SrsBuffer;
@ -125,98 +126,66 @@ private:
void dispose(ISrsResource* c); void dispose(ISrsResource* c);
}; };
// A simple lazy-sweep GC, just wait for a long time to delete the disposable resources. // This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this
class SrsLazySweepGc : public ISrsLazyGc // smart pointer resource, such as by implementing delayed release.
{
public:
SrsLazySweepGc();
virtual ~SrsLazySweepGc();
public:
virtual srs_error_t start();
virtual void remove(SrsLazyObject* c);
};
extern ISrsLazyGc* _srs_gc;
// A wrapper template for lazy-sweep resource.
// See https://github.com/ossrs/srs/issues/3176#lazy-sweep
// //
// Usage for resource which manages itself in coroutine cycle, see SrsLazyGbSession: // It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage
// class Resource { // is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted
// private: // to one another.
// SrsLazyObjectWrapper<Resource>* wrapper_;
// private:
// friend class SrsLazyObjectWrapper<Resource>;
// Resource(SrsLazyObjectWrapper<Resource>* wrapper) { wrapper_ = wrapper; }
// public:
// srs_error_t Resource::cycle() {
// srs_error_t err = do_cycle();
// _srs_gb_manager->remove(wrapper_);
// return err;
// }
// };
// SrsLazyObjectWrapper<Resource>* obj = new SrsLazyObjectWrapper<Resource>*();
// _srs_gb_manager->add(obj); // Add wrapper to resource manager.
// Start a coroutine to do obj->resource()->cycle().
// //
// Usage for resource managed by other object: // Note that we don't need to implement the move constructor and move assignment operator, because we directly
// class Resource { // use SrsSharedPtr as instance member, so we can only copy it.
// private:
// friend class SrsLazyObjectWrapper<Resource>;
// Resource(SrsLazyObjectWrapper<Resource>* /*wrapper*/) {
// }
// };
// class Manager {
// private:
// SrsLazyObjectWrapper<Resource>* wrapper_;
// public:
// Manager() { wrapper_ = new SrsLazyObjectWrapper<Resource>(); }
// ~Manager() { srs_freep(wrapper_); }
// };
// Manager* manager = new Manager();
// srs_freep(manager);
// //
// Note that under-layer resource are destroyed by _srs_gc, which is literally equal to srs_freep. However, the root // Usage:
// wrapper might be managed by other resource manager, such as _srs_gb_manager for SrsLazyGbSession. Furthermore, other // SrsSharedResource<MyClass>* ptr = new SrsSharedResource<MyClass>(new MyClass());
// copied out wrappers might be freed by srs_freep. All are ok, because all wrapper and resources are simply normal // (*ptr)->do_something();
// object, so if you added to manager then you should use manager to remove it, and you can also directly delete it. //
// ISrsResourceManager* manager = ...;
// manager->remove(ptr);
template<typename T> template<typename T>
class SrsLazyObjectWrapper : public ISrsResource class SrsSharedResource : public ISrsResource
{ {
private: private:
T* resource_; SrsSharedPtr<T> ptr_;
public: public:
SrsLazyObjectWrapper() { SrsSharedResource(T* ptr) : ptr_(ptr) {
init(new T(this));
} }
virtual ~SrsLazyObjectWrapper() { SrsSharedResource(const SrsSharedResource<T>& cp) : ptr_(cp.ptr_) {
resource_->gc_dispose();
if (resource_->gc_ref() == 0) {
_srs_gc->remove(resource_);
} }
virtual ~SrsSharedResource() {
}
public:
// Get the object.
T* get() {
return ptr_.get();
}
// Overload the -> operator.
T* operator->() {
return ptr_.operator->();
}
// The assign operator.
SrsSharedResource<T>& operator=(const SrsSharedResource<T>& cp) {
if (this != &cp) {
ptr_ = cp.ptr_;
}
return *this;
} }
private: private:
SrsLazyObjectWrapper(T* resource) { // Overload the * operator.
init(resource); T& operator*() {
return ptr_.operator*();
} }
void init(T* resource) { // Overload the bool operator.
resource_ = resource; operator bool() const {
resource_->gc_use(); return ptr_.operator bool();
}
public:
SrsLazyObjectWrapper<T>* copy() {
return new SrsLazyObjectWrapper<T>(resource_);
}
T* resource() {
return resource_;
} }
// Interface ISrsResource // Interface ISrsResource
public: public:
virtual const SrsContextId& get_id() { virtual const SrsContextId& get_id() {
return resource_->get_id(); return ptr_->get_id();
} }
virtual std::string desc() { virtual std::string desc() {
return resource_->desc(); return ptr_->desc();
} }
}; };

View file

@ -70,11 +70,12 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state)
return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str()); return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str());
} }
SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper<SrsLazyGbSession>* wrapper_root) SrsGbSession::SrsGbSession() : sip_(new SrsGbSipTcpConn()), media_(new SrsGbMediaTcpConn())
{ {
wrapper_root_ = wrapper_root; wrapper_ = NULL;
sip_ = new SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>(); owner_coroutine_ = NULL;
media_ = new SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>(); owner_cid_ = NULL;
muxer_ = new SrsGbMuxer(this); muxer_ = new SrsGbMuxer(this);
state_ = SrsGbSessionStateInit; state_ = SrsGbSessionStateInit;
@ -102,41 +103,43 @@ SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper<SrsLazyGbSession>* wrapp
cid_ = _srs_context->generate_id(); cid_ = _srs_context->generate_id();
_srs_context->set_id(cid_); // Also change current coroutine cid as session's. _srs_context->set_id(cid_); // Also change current coroutine cid as session's.
trd_ = new SrsSTCoroutine("GBS", this, cid_);
} }
SrsLazyGbSession::~SrsLazyGbSession() SrsGbSession::~SrsGbSession()
{ {
srs_freep(trd_);
srs_freep(sip_);
srs_freep(media_);
srs_freep(muxer_); srs_freep(muxer_);
srs_freep(ppp_); srs_freep(ppp_);
} }
srs_error_t SrsLazyGbSession::initialize(SrsConfDirective* conf) void SrsGbSession::setup(SrsConfDirective* conf)
{ {
srs_error_t err = srs_success;
pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf); pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf);
if (candidate_ == "*") { if (candidate_ == "*") {
pip_ = srs_get_public_internet_address(true); pip_ = srs_get_public_internet_address(true);
} }
std::string output = _srs_config->get_stream_caster_output(conf); std::string output = _srs_config->get_stream_caster_output(conf);
if ((err = muxer_->initialize(output)) != srs_success) { muxer_->setup(output);
return srs_error_wrap(err, "muxer");
}
connecting_timeout_ = _srs_config->get_stream_caster_sip_timeout(conf); connecting_timeout_ = _srs_config->get_stream_caster_sip_timeout(conf);
reinvite_wait_ = _srs_config->get_stream_caster_sip_reinvite(conf); reinvite_wait_ = _srs_config->get_stream_caster_sip_reinvite(conf);
srs_trace("Session: Start timeout=%dms, reinvite=%dms, candidate=%s, pip=%s, output=%s", srsu2msi(connecting_timeout_), srs_trace("Session: Start timeout=%dms, reinvite=%dms, candidate=%s, pip=%s, output=%s", srsu2msi(connecting_timeout_),
srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str()); srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str());
return err;
} }
void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs) void SrsGbSession::setup_owner(SrsSharedResource<SrsGbSession>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
{
wrapper_ = wrapper;
owner_coroutine_ = owner_coroutine;
owner_cid_ = owner_cid;
}
void SrsGbSession::on_executor_done(ISrsInterruptable* executor)
{
owner_coroutine_ = NULL;
}
void SrsGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs)
{ {
// Got a new context, that is new media transport. // Got a new context, that is new media transport.
if (media_id_ != ctx->media_id_) { if (media_id_ != ctx->media_id_) {
@ -195,57 +198,47 @@ void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const st
} }
} }
void SrsLazyGbSession::on_sip_transport(SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* sip) void SrsGbSession::on_sip_transport(SrsSharedResource<SrsGbSipTcpConn> sip)
{ {
srs_freep(sip_); sip_ = sip;
sip_ = sip->copy();
// Change id of SIP and all its child coroutines. // Change id of SIP and all its child coroutines.
sip_->resource()->set_cid(cid_); sip_->set_cid(cid_);
} }
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* SrsLazyGbSession::sip_transport() SrsSharedResource<SrsGbSipTcpConn> SrsGbSession::sip_transport()
{ {
return sip_; return sip_;
} }
void SrsLazyGbSession::on_media_transport(SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* media) void SrsGbSession::on_media_transport(SrsSharedResource<SrsGbMediaTcpConn> media)
{ {
srs_freep(media_); media_ = media;
media_ = media->copy();
// Change id of SIP and all its child coroutines. // Change id of SIP and all its child coroutines.
media_->resource()->set_cid(cid_); media_->set_cid(cid_);
} }
std::string SrsLazyGbSession::pip() std::string SrsGbSession::pip()
{ {
return pip_; return pip_;
} }
srs_error_t SrsLazyGbSession::start() srs_error_t SrsGbSession::cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if ((err = trd_->start()) != srs_success) { // Update all context id to cid of session.
return srs_error_wrap(err, "coroutine"); _srs_context->set_id(cid_);
} owner_cid_->set_cid(cid_);
sip_->set_cid(cid_);
media_->set_cid(cid_);
return err; // Drive the session cycle.
} err = do_cycle();
srs_error_t SrsLazyGbSession::cycle()
{
srs_error_t err = do_cycle();
// Interrupt the SIP and media transport when session terminated. // Interrupt the SIP and media transport when session terminated.
sip_->resource()->interrupt(); sip_->interrupt();
media_->resource()->interrupt(); media_->interrupt();
// Note that we added wrapper to manager, so we must free the wrapper, not this connection.
SrsLazyObjectWrapper<SrsLazyGbSession>* wrapper = wrapper_root_;
srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it.
_srs_gb_manager->remove(wrapper);
// success. // success.
if (err == srs_success) { if (err == srs_success) {
@ -274,12 +267,13 @@ srs_error_t SrsLazyGbSession::cycle()
return srs_success; return srs_success;
} }
srs_error_t SrsLazyGbSession::do_cycle() srs_error_t SrsGbSession::do_cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
while (true) { while (true) {
if ((err = trd_->pull()) != srs_success) { if (!owner_coroutine_) return err;
if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "pull"); return srs_error_wrap(err, "pull");
} }
@ -287,7 +281,7 @@ srs_error_t SrsLazyGbSession::do_cycle()
srs_usleep(SRS_GB_SESSION_DRIVE_INTERVAL); srs_usleep(SRS_GB_SESSION_DRIVE_INTERVAL);
// Client send bye, we should dispose the session. // Client send bye, we should dispose the session.
if (sip_->resource()->is_bye()) { if (sip_->is_bye()) {
return err; return err;
} }
@ -310,35 +304,33 @@ srs_error_t SrsLazyGbSession::do_cycle()
return err; return err;
} }
srs_error_t SrsLazyGbSession::drive_state() srs_error_t SrsGbSession::drive_state()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
#define SRS_GB_CHANGE_STATE_TO(state) { \ #define SRS_GB_CHANGE_STATE_TO(state) { \
SrsGbSessionState ostate = set_state(state); \ SrsGbSessionState ostate = set_state(state); \
srs_trace("Session: Change device=%s, state=%s", sip_->resource()->device_id().c_str(), \ srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \
srs_gb_state(ostate, state_).c_str()); \ srs_gb_state(ostate, state_).c_str()); \
} }
if (state_ == SrsGbSessionStateInit) { if (state_ == SrsGbSessionStateInit) {
// Set to connecting, whatever media is connected or not, because the connecting state will handle it if media // Set to connecting, whatever media is connected or not, because the connecting state will handle it if media
// is connected, so we don't need to handle it here. // is connected, so we don't need to handle it here.
if (sip_->resource()->is_registered()) { if (sip_->is_registered()) {
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting);
connecting_starttime_ = srs_update_system_time(); connecting_starttime_ = srs_update_system_time();
} }
// Invite if media is not connected. // Invite if media is not connected.
if (sip_->resource()->is_registered() && !media_->resource()->is_connected()) { if (sip_->is_registered() && !media_->is_connected()) {
uint32_t ssrc = 0; uint32_t ssrc = 0;
if ((err = sip_->resource()->invite_request(&ssrc)) != srs_success) { if ((err = sip_->invite_request(&ssrc)) != srs_success) {
return srs_error_wrap(err, "invite"); return srs_error_wrap(err, "invite");
} }
// Now, we're able to query session by ssrc, for media packets. // Now, we're able to query session by ssrc, for media packets.
SrsLazyObjectWrapper<SrsLazyGbSession>* wrapper = wrapper_root_; _srs_gb_manager->add_with_fast_id(ssrc, wrapper_);
srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine.
_srs_gb_manager->add_with_fast_id(ssrc, wrapper);
} }
} }
@ -349,32 +341,32 @@ srs_error_t SrsLazyGbSession::drive_state()
} }
srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(), srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(),
srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
sip_->resource()->reset_to_register(); sip_->reset_to_register();
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
} }
if (sip_->resource()->is_stable() && media_->resource()->is_connected()) { if (sip_->is_stable() && media_->is_connected()) {
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished);
} }
} }
if (state_ == SrsGbSessionStateEstablished) { if (state_ == SrsGbSessionStateEstablished) {
if (sip_->resource()->is_bye()) { if (sip_->is_bye()) {
srs_trace("Session: Dispose for client bye"); srs_trace("Session: Dispose for client bye");
return err; return err;
} }
// When media disconnected, we wait for a while then reinvite. // When media disconnected, we wait for a while then reinvite.
if (!media_->resource()->is_connected()) { if (!media_->is_connected()) {
if (!reinviting_starttime_) { if (!reinviting_starttime_) {
reinviting_starttime_ = srs_update_system_time(); reinviting_starttime_ = srs_update_system_time();
} }
if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) { if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) {
reinviting_starttime_ = 0; reinviting_starttime_ = 0;
srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(), srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(),
srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected());
sip_->resource()->reset_to_register(); sip_->reset_to_register();
SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit);
} }
} }
@ -383,19 +375,19 @@ srs_error_t SrsLazyGbSession::drive_state()
return err; return err;
} }
SrsGbSessionState SrsLazyGbSession::set_state(SrsGbSessionState v) SrsGbSessionState SrsGbSession::set_state(SrsGbSessionState v)
{ {
SrsGbSessionState state = state_; SrsGbSessionState state = state_;
state_ = v; state_ = v;
return state; return state;
} }
const SrsContextId& SrsLazyGbSession::get_id() const SrsContextId& SrsGbSession::get_id()
{ {
return cid_; return cid_;
} }
std::string SrsLazyGbSession::desc() std::string SrsGbSession::desc()
{ {
return "GBS"; return "GBS";
} }
@ -463,27 +455,33 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf
// Handle TCP connections. // Handle TCP connections.
if (listener == sip_listener_) { if (listener == sip_listener_) {
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* conn = new SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>(); SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn();
SrsLazyGbSipTcpConn* resource = dynamic_cast<SrsLazyGbSipTcpConn*>(conn->resource()); raw_conn->setup(conf_, sip_listener_, media_listener_, stfd);
resource->setup(conf_, sip_listener_, media_listener_, stfd);
if ((err = resource->start()) != srs_success) { SrsSharedResource<SrsGbSipTcpConn>* conn = new SrsSharedResource<SrsGbSipTcpConn>(raw_conn);
srs_freep(conn); _srs_gb_manager->add(conn, NULL);
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
raw_conn->setup_owner(conn, executor, executor);
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "gb sip"); return srs_error_wrap(err, "gb sip");
} }
_srs_gb_manager->add(conn, NULL);
} else if (listener == media_listener_) { } else if (listener == media_listener_) {
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* conn = new SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>(); SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn();
SrsLazyGbMediaTcpConn* resource = dynamic_cast<SrsLazyGbMediaTcpConn*>(conn->resource()); raw_conn->setup(stfd);
resource->setup(stfd);
if ((err = resource->start()) != srs_success) { SrsSharedResource<SrsGbMediaTcpConn>* conn = new SrsSharedResource<SrsGbMediaTcpConn>(raw_conn);
srs_freep(conn); _srs_gb_manager->add(conn, NULL);
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn);
raw_conn->setup_owner(conn, executor, executor);
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "gb media"); return srs_error_wrap(err, "gb media");
} }
_srs_gb_manager->add(conn, NULL);
} else { } else {
srs_warn("GB: Ignore TCP client"); srs_warn("GB: Ignore TCP client");
srs_close_stfd(stfd); srs_close_stfd(stfd);
@ -492,9 +490,13 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf
return err; return err;
} }
SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* wrapper_root) SrsGbSipTcpConn::SrsGbSipTcpConn()
{ {
wrapper_root_ = wrapper_root; wrapper_ = NULL;
owner_coroutine_ = NULL;
owner_cid_ = NULL;
cid_ = _srs_context->get_id();
session_ = NULL; session_ = NULL;
state_ = SrsGbSipStateInit; state_ = SrsGbSipStateInit;
register_ = new SrsSipMessage(); register_ = new SrsSipMessage();
@ -507,54 +509,62 @@ SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrapper<SrsLazyGbSipTcpCon
conn_ = NULL; conn_ = NULL;
receiver_ = NULL; receiver_ = NULL;
sender_ = NULL; sender_ = NULL;
trd_ = new SrsSTCoroutine("sip", this);
} }
SrsLazyGbSipTcpConn::~SrsLazyGbSipTcpConn() SrsGbSipTcpConn::~SrsGbSipTcpConn()
{ {
srs_freep(trd_);
srs_freep(receiver_); srs_freep(receiver_);
srs_freep(sender_); srs_freep(sender_);
srs_freep(conn_); srs_freep(conn_);
srs_freep(session_);
srs_freep(register_); srs_freep(register_);
srs_freep(invite_ok_); srs_freep(invite_ok_);
srs_freep(conf_); srs_freep(conf_);
} }
void SrsLazyGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd) void SrsGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd)
{ {
srs_freep(conf_); srs_freep(conf_);
conf_ = conf->copy(); conf_ = conf->copy();
session_ = NULL;
sip_listener_ = sip; sip_listener_ = sip;
media_listener_ = media; media_listener_ = media;
conn_ = new SrsTcpConnection(stfd); conn_ = new SrsTcpConnection(stfd);
receiver_ = new SrsLazyGbSipTcpReceiver(this, conn_); receiver_ = new SrsGbSipTcpReceiver(this, conn_);
sender_ = new SrsLazyGbSipTcpSender(conn_); sender_ = new SrsGbSipTcpSender(conn_);
} }
std::string SrsLazyGbSipTcpConn::device_id() void SrsGbSipTcpConn::setup_owner(SrsSharedResource<SrsGbSipTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
{
wrapper_ = wrapper;
owner_coroutine_ = owner_coroutine;
owner_cid_ = owner_cid;
}
void SrsGbSipTcpConn::on_executor_done(ISrsInterruptable* executor)
{
owner_coroutine_ = NULL;
}
std::string SrsGbSipTcpConn::device_id()
{ {
return register_->device_id(); return register_->device_id();
} }
void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) void SrsGbSipTcpConn::set_cid(const SrsContextId& cid)
{ {
trd_->set_cid(cid); if (owner_cid_) owner_cid_->set_cid(cid);
receiver_->set_cid(cid); receiver_->set_cid(cid);
sender_->set_cid(cid); sender_->set_cid(cid);
cid_ = cid;
} }
void SrsLazyGbSipTcpConn::query_ports(int* sip, int* media) void SrsGbSipTcpConn::query_ports(int* sip, int* media)
{ {
if (sip) *sip = sip_listener_->port(); if (sip) *sip = sip_listener_->port();
if (media) *media = media_listener_->port(); if (media) *media = media_listener_->port();
} }
srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg) srs_error_t SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -603,7 +613,7 @@ srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg)
return err; return err;
} }
void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) void SrsGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg)
{ {
// Drive state machine when enqueue message. // Drive state machine when enqueue message.
drive_state(msg); drive_state(msg);
@ -612,7 +622,7 @@ void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg)
sender_->enqueue(msg); sender_->enqueue(msg);
} }
void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg) void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -669,7 +679,7 @@ void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg)
} }
} }
void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg) void SrsGbSipTcpConn::register_response(SrsSipMessage* msg)
{ {
SrsSipMessage* res = new SrsSipMessage(); SrsSipMessage* res = new SrsSipMessage();
@ -686,7 +696,7 @@ void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg)
enqueue_sip_message(res); enqueue_sip_message(res);
} }
void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) void SrsGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status)
{ {
SrsSipMessage* res = new SrsSipMessage(); SrsSipMessage* res = new SrsSipMessage();
@ -701,9 +711,9 @@ void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status statu
enqueue_sip_message(res); enqueue_sip_message(res);
} }
void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) void SrsGbSipTcpConn::invite_ack(SrsSipMessage* msg)
{ {
string pip = session_->resource()->pip(); // Parse from CANDIDATE string pip = session_->pip(); // Parse from CANDIDATE
int sip_port; query_ports(&sip_port, NULL); int sip_port; query_ports(&sip_port, NULL);
string gb_device_id = srs_fmt("sip:%s@%s", msg->to_address_user_.c_str(), msg->to_address_host_.c_str()); string gb_device_id = srs_fmt("sip:%s@%s", msg->to_address_user_.c_str(), msg->to_address_host_.c_str());
string branch = srs_random_str(6); string branch = srs_random_str(6);
@ -722,7 +732,7 @@ void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg)
enqueue_sip_message(req); enqueue_sip_message(req);
} }
void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg) void SrsGbSipTcpConn::bye_response(SrsSipMessage* msg)
{ {
SrsSipMessage* res = new SrsSipMessage(); SrsSipMessage* res = new SrsSipMessage();
@ -737,7 +747,7 @@ void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg)
enqueue_sip_message(res); enqueue_sip_message(res);
} }
srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -765,7 +775,7 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc)
if (pssrc) *pssrc = ssrc_v_; if (pssrc) *pssrc = ssrc_v_;
} }
string pip = session_->resource()->pip(); // Parse from CANDIDATE string pip = session_->pip(); // Parse from CANDIDATE
int sip_port, media_port; query_ports(&sip_port, &media_port); int sip_port, media_port; query_ports(&sip_port, &media_port);
string srs_device_id = srs_fmt("sip:%s@%s", register_->request_uri_user_.c_str(), register_->request_uri_host_.c_str()); string srs_device_id = srs_fmt("sip:%s@%s", register_->request_uri_user_.c_str(), register_->request_uri_host_.c_str());
string gb_device_id = srs_fmt("sip:%s@%s", register_->from_address_user_.c_str(), register_->from_address_host_.c_str()); string gb_device_id = srs_fmt("sip:%s@%s", register_->from_address_user_.c_str(), register_->from_address_host_.c_str());
@ -834,63 +844,59 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc)
return err; return err;
} }
void SrsLazyGbSipTcpConn::interrupt() void SrsGbSipTcpConn::interrupt()
{ {
receiver_->interrupt(); receiver_->interrupt();
sender_->interrupt(); sender_->interrupt();
trd_->interrupt(); if (owner_coroutine_) owner_coroutine_->interrupt();
} }
SrsGbSipState SrsLazyGbSipTcpConn::state() SrsGbSipState SrsGbSipTcpConn::state()
{ {
return state_; return state_;
} }
void SrsLazyGbSipTcpConn::reset_to_register() void SrsGbSipTcpConn::reset_to_register()
{ {
state_ = SrsGbSipStateRegistered; state_ = SrsGbSipStateRegistered;
} }
bool SrsLazyGbSipTcpConn::is_registered() bool SrsGbSipTcpConn::is_registered()
{ {
return state_ >= SrsGbSipStateRegistered && state_ <= SrsGbSipStateStable; return state_ >= SrsGbSipStateRegistered && state_ <= SrsGbSipStateStable;
} }
bool SrsLazyGbSipTcpConn::is_stable() bool SrsGbSipTcpConn::is_stable()
{ {
return state_ == SrsGbSipStateStable; return state_ == SrsGbSipStateStable;
} }
bool SrsLazyGbSipTcpConn::is_bye() bool SrsGbSipTcpConn::is_bye()
{ {
return state_ == SrsGbSipStateBye; return state_ == SrsGbSipStateBye;
} }
SrsGbSipState SrsLazyGbSipTcpConn::set_state(SrsGbSipState v) SrsGbSipState SrsGbSipTcpConn::set_state(SrsGbSipState v)
{ {
SrsGbSipState state = state_; SrsGbSipState state = state_;
state_ = v; state_ = v;
return state; return state;
} }
const SrsContextId& SrsLazyGbSipTcpConn::get_id() const SrsContextId& SrsGbSipTcpConn::get_id()
{ {
return trd_->cid(); return cid_;
} }
std::string SrsLazyGbSipTcpConn::desc() std::string SrsGbSipTcpConn::desc()
{ {
return "GB-SIP-TCP"; return "GB-SIP-TCP";
} }
srs_error_t SrsLazyGbSipTcpConn::start() srs_error_t SrsGbSipTcpConn::cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "sip");
}
if ((err = receiver_->start()) != srs_success) { if ((err = receiver_->start()) != srs_success) {
return srs_error_wrap(err, "receiver"); return srs_error_wrap(err, "receiver");
} }
@ -899,22 +905,13 @@ srs_error_t SrsLazyGbSipTcpConn::start()
return srs_error_wrap(err, "sender"); return srs_error_wrap(err, "sender");
} }
return err; // Wait for the SIP connection to be terminated.
} err = do_cycle();
srs_error_t SrsLazyGbSipTcpConn::cycle()
{
srs_error_t err = do_cycle();
// Interrupt the receiver and sender coroutine. // Interrupt the receiver and sender coroutine.
receiver_->interrupt(); receiver_->interrupt();
sender_->interrupt(); sender_->interrupt();
// Note that we added wrapper to manager, so we must free the wrapper, not this connection.
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* wrapper = wrapper_root_;
srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it.
_srs_gb_manager->remove(wrapper);
// success. // success.
if (err == srs_success) { if (err == srs_success) {
srs_trace("client finished."); srs_trace("client finished.");
@ -942,23 +939,23 @@ srs_error_t SrsLazyGbSipTcpConn::cycle()
return srs_success; return srs_success;
} }
srs_error_t SrsLazyGbSipTcpConn::do_cycle() srs_error_t SrsGbSipTcpConn::do_cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
while (true) { while (true) {
if ((err = trd_->pull()) != srs_success) { if (!owner_coroutine_) return err;
if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "pull"); return srs_error_wrap(err, "pull");
} }
// TODO: Handle other messages.
srs_usleep(SRS_UTIME_NO_TIMEOUT); srs_usleep(SRS_UTIME_NO_TIMEOUT);
} }
return err; return err;
} }
srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper<SrsLazyGbSession>** psession) srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** psession)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -968,32 +965,29 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW
// Only create session for REGISTER request. // Only create session for REGISTER request.
if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err; if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err;
// The lazy-sweep wrapper for this resource.
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* wrapper = wrapper_root_;
srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine of receiver.
// Find exists session for register, might be created by another object and still alive. // Find exists session for register, might be created by another object and still alive.
SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_id(device)); SrsSharedResource<SrsGbSession>* session = dynamic_cast<SrsSharedResource<SrsGbSession>*>(_srs_gb_manager->find_by_id(device));
SrsGbSession* raw_session = session ? (*session).get() : NULL;
if (!session) { if (!session) {
// Create new GB session. // Create new GB session.
session = new SrsLazyObjectWrapper<SrsLazyGbSession>(); raw_session = new SrsGbSession();
raw_session->setup(conf_);
if ((err = session->resource()->initialize(conf_)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "initialize");
}
if ((err = session->resource()->start()) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "start");
}
session = new SrsSharedResource<SrsGbSession>(raw_session);
_srs_gb_manager->add_with_id(device, session); _srs_gb_manager->add_with_id(device, session);
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session);
raw_session->setup_owner(session, executor, executor);
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "gb session");
}
} }
// Try to load state from previous SIP connection. // Try to load state from previous SIP connection.
SrsLazyGbSipTcpConn* pre = dynamic_cast<SrsLazyGbSipTcpConn*>(session->resource()->sip_transport()->resource()); SrsSharedResource<SrsGbSipTcpConn> pre = raw_session->sip_transport();
if (pre) { if (pre.get() && pre.get() != this) {
state_ = pre->state_; state_ = pre->state_;
ssrc_str_ = pre->ssrc_str_; ssrc_str_ = pre->ssrc_str_;
ssrc_v_ = pre->ssrc_v_; ssrc_v_ = pre->ssrc_v_;
@ -1001,36 +995,36 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW
srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy(); srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy();
} }
// Notice SIP session to use current SIP connection. // Notice session to use current SIP connection.
session->resource()->on_sip_transport(wrapper); raw_session->on_sip_transport(*wrapper_);
*psession = session->copy(); *psession = raw_session;
return err; return err;
} }
SrsLazyGbSipTcpReceiver::SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn) SrsGbSipTcpReceiver::SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn)
{ {
sip_ = sip; sip_ = sip;
conn_ = conn; conn_ = conn;
trd_ = new SrsSTCoroutine("sip-receiver", this); trd_ = new SrsSTCoroutine("sip-receiver", this);
} }
SrsLazyGbSipTcpReceiver::~SrsLazyGbSipTcpReceiver() SrsGbSipTcpReceiver::~SrsGbSipTcpReceiver()
{ {
srs_freep(trd_); srs_freep(trd_);
} }
void SrsLazyGbSipTcpReceiver::interrupt() void SrsGbSipTcpReceiver::interrupt()
{ {
trd_->interrupt(); trd_->interrupt();
} }
void SrsLazyGbSipTcpReceiver::set_cid(const SrsContextId& cid) void SrsGbSipTcpReceiver::set_cid(const SrsContextId& cid)
{ {
trd_->set_cid(cid); trd_->set_cid(cid);
} }
srs_error_t SrsLazyGbSipTcpReceiver::start() srs_error_t SrsGbSipTcpReceiver::start()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1041,7 +1035,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::start()
return err; return err;
} }
srs_error_t SrsLazyGbSipTcpReceiver::cycle() srs_error_t SrsGbSipTcpReceiver::cycle()
{ {
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();
@ -1053,7 +1047,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::cycle()
return err; return err;
} }
srs_error_t SrsLazyGbSipTcpReceiver::do_cycle() srs_error_t SrsGbSipTcpReceiver::do_cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1092,14 +1086,14 @@ srs_error_t SrsLazyGbSipTcpReceiver::do_cycle()
return err; return err;
} }
SrsLazyGbSipTcpSender::SrsLazyGbSipTcpSender(SrsTcpConnection* conn) SrsGbSipTcpSender::SrsGbSipTcpSender(SrsTcpConnection* conn)
{ {
conn_ = conn; conn_ = conn;
wait_ = srs_cond_new(); wait_ = srs_cond_new();
trd_ = new SrsSTCoroutine("sip-sender", this); trd_ = new SrsSTCoroutine("sip-sender", this);
} }
SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender() SrsGbSipTcpSender::~SrsGbSipTcpSender()
{ {
srs_freep(trd_); srs_freep(trd_);
srs_cond_destroy(wait_); srs_cond_destroy(wait_);
@ -1110,23 +1104,23 @@ SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender()
} }
} }
void SrsLazyGbSipTcpSender::enqueue(SrsSipMessage* msg) void SrsGbSipTcpSender::enqueue(SrsSipMessage* msg)
{ {
msgs_.push_back(msg); msgs_.push_back(msg);
srs_cond_signal(wait_); srs_cond_signal(wait_);
} }
void SrsLazyGbSipTcpSender::interrupt() void SrsGbSipTcpSender::interrupt()
{ {
trd_->interrupt(); trd_->interrupt();
} }
void SrsLazyGbSipTcpSender::set_cid(const SrsContextId& cid) void SrsGbSipTcpSender::set_cid(const SrsContextId& cid)
{ {
trd_->set_cid(cid); trd_->set_cid(cid);
} }
srs_error_t SrsLazyGbSipTcpSender::start() srs_error_t SrsGbSipTcpSender::start()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1137,7 +1131,7 @@ srs_error_t SrsLazyGbSipTcpSender::start()
return err; return err;
} }
srs_error_t SrsLazyGbSipTcpSender::cycle() srs_error_t SrsGbSipTcpSender::cycle()
{ {
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();
@ -1149,7 +1143,7 @@ srs_error_t SrsLazyGbSipTcpSender::cycle()
return err; return err;
} }
srs_error_t SrsLazyGbSipTcpSender::do_cycle() srs_error_t SrsGbSipTcpSender::do_cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1219,71 +1213,74 @@ ISrsPsPackHandler::~ISrsPsPackHandler()
{ {
} }
SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* wrapper_root) SrsGbMediaTcpConn::SrsGbMediaTcpConn()
{ {
wrapper_root_ = wrapper_root;
pack_ = new SrsPackContext(this); pack_ = new SrsPackContext(this);
trd_ = new SrsSTCoroutine("media", this);
buffer_ = new uint8_t[65535]; buffer_ = new uint8_t[65535];
conn_ = NULL; conn_ = NULL;
wrapper_ = NULL;
owner_coroutine_ = NULL;
owner_cid_ = NULL;
cid_ = _srs_context->get_id();
session_ = NULL; session_ = NULL;
connected_ = false; connected_ = false;
nn_rtcp_ = 0; nn_rtcp_ = 0;
} }
SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn() SrsGbMediaTcpConn::~SrsGbMediaTcpConn()
{ {
srs_freep(trd_);
srs_freep(conn_); srs_freep(conn_);
srs_freepa(buffer_); srs_freepa(buffer_);
srs_freep(pack_); srs_freep(pack_);
srs_freep(session_);
} }
void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd) void SrsGbMediaTcpConn::setup(srs_netfd_t stfd)
{ {
srs_freep(conn_); srs_freep(conn_);
conn_ = new SrsTcpConnection(stfd); conn_ = new SrsTcpConnection(stfd);
} }
bool SrsLazyGbMediaTcpConn::is_connected() void SrsGbMediaTcpConn::setup_owner(SrsSharedResource<SrsGbMediaTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
{
wrapper_ = wrapper;
owner_coroutine_ = owner_coroutine;
owner_cid_ = owner_cid;
}
void SrsGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor)
{
owner_coroutine_ = NULL;
}
bool SrsGbMediaTcpConn::is_connected()
{ {
return connected_; return connected_;
} }
void SrsLazyGbMediaTcpConn::interrupt() void SrsGbMediaTcpConn::interrupt()
{ {
trd_->interrupt(); if (owner_coroutine_) owner_coroutine_->interrupt();
} }
void SrsLazyGbMediaTcpConn::set_cid(const SrsContextId& cid) void SrsGbMediaTcpConn::set_cid(const SrsContextId& cid)
{ {
trd_->set_cid(cid); if (owner_cid_) owner_cid_->set_cid(cid);
cid_ = cid;
} }
const SrsContextId& SrsLazyGbMediaTcpConn::get_id() const SrsContextId& SrsGbMediaTcpConn::get_id()
{ {
return _srs_context->get_id(); return cid_;
} }
std::string SrsLazyGbMediaTcpConn::desc() std::string SrsGbMediaTcpConn::desc()
{ {
return "GB-Media-TCP"; return "GB-Media-TCP";
} }
srs_error_t SrsLazyGbMediaTcpConn::start() srs_error_t SrsGbMediaTcpConn::cycle()
{
srs_error_t err = srs_success;
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
return err;
}
srs_error_t SrsLazyGbMediaTcpConn::cycle()
{ {
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();
@ -1295,11 +1292,6 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle()
connected_ = false; connected_ = false;
srs_trace("PS: Media disconnect, code=%d", srs_error_code(err)); srs_trace("PS: Media disconnect, code=%d", srs_error_code(err));
// Note that we added wrapper to manager, so we must free the wrapper, not this connection.
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* wrapper = wrapper_root_;
srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it.
_srs_gb_manager->remove(wrapper);
// success. // success.
if (err == srs_success) { if (err == srs_success) {
srs_trace("client finished."); srs_trace("client finished.");
@ -1327,7 +1319,7 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle()
return srs_success; return srs_success;
} }
srs_error_t SrsLazyGbMediaTcpConn::do_cycle() srs_error_t SrsGbMediaTcpConn::do_cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1341,7 +1333,8 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle()
uint32_t reserved = 0; uint32_t reserved = 0;
for (;;) { for (;;) {
if ((err = trd_->pull()) != srs_success) { if (!owner_coroutine_) return err;
if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "pull"); return srs_error_wrap(err, "pull");
} }
@ -1426,7 +1419,7 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle()
return err; return err;
} }
srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs) srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1437,7 +1430,7 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector
} }
// Notify session about the media pack. // Notify session about the media pack.
session_->resource()->on_ps_pack(pack_, ps, msgs); session_->on_ps_pack(pack_, ps, msgs);
//for (vector<SrsTsMessage*>::const_iterator it = msgs.begin(); it != msgs.end(); ++it) { //for (vector<SrsTsMessage*>::const_iterator it = msgs.begin(); it != msgs.end(); ++it) {
// SrsTsMessage* msg = *it; // SrsTsMessage* msg = *it;
@ -1450,23 +1443,22 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector
return err; return err;
} }
srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrapper<SrsLazyGbSession>** psession) srs_error_t SrsGbMediaTcpConn::bind_session(uint32_t ssrc, SrsGbSession** psession)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if (!ssrc) return err; if (!ssrc) return err;
// The lazy-sweep wrapper for this resource.
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* wrapper = wrapper_root_;
srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine.
// Find exists session for register, might be created by another object and still alive. // Find exists session for register, might be created by another object and still alive.
SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc)); SrsSharedResource<SrsGbSession>* session = dynamic_cast<SrsSharedResource<SrsGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (!session) return err; if (!session) return err;
_srs_gb_manager->add_with_fast_id(ssrc, session); SrsGbSession* raw_session = (*session).get();
session->resource()->on_media_transport(wrapper); srs_assert(raw_session);
*psession = session->copy();
// Notice session to use current media connection.
raw_session->on_media_transport(*wrapper_);
*psession = raw_session;
return err; return err;
} }
@ -1545,7 +1537,7 @@ SrsSharedPtrMessage* SrsMpegpsQueue::dequeue()
return NULL; return NULL;
} }
SrsGbMuxer::SrsGbMuxer(SrsLazyGbSession* session) SrsGbMuxer::SrsGbMuxer(SrsGbSession* session)
{ {
sdk_ = NULL; sdk_ = NULL;
session_ = session; session_ = session;
@ -1580,13 +1572,9 @@ SrsGbMuxer::~SrsGbMuxer()
srs_freep(pprint_); srs_freep(pprint_);
} }
srs_error_t SrsGbMuxer::initialize(std::string output) void SrsGbMuxer::setup(std::string output)
{ {
srs_error_t err = srs_success;
output_ = output; output_ = output;
return err;
} }
srs_error_t SrsGbMuxer::on_ts_message(SrsTsMessage* msg) srs_error_t SrsGbMuxer::on_ts_message(SrsTsMessage* msg)
@ -2095,7 +2083,7 @@ srs_error_t SrsGbMuxer::connect()
// Cleanup the data before connect again. // Cleanup the data before connect again.
close(); close();
string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id()); string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->device_id());
srs_trace("Muxer: Convert GB to RTMP %s", url.c_str()); srs_trace("Muxer: Convert GB to RTMP %s", url.c_str());
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;

View file

@ -26,11 +26,11 @@ class SrsCoroutine;
class SrsPackContext; class SrsPackContext;
class SrsBuffer; class SrsBuffer;
class SrsSipMessage; class SrsSipMessage;
class SrsLazyGbSession; class SrsGbSession;
class SrsLazyGbSipTcpConn; class SrsGbSipTcpConn;
class SrsLazyGbMediaTcpConn; class SrsGbMediaTcpConn;
class SrsLazyGbSipTcpReceiver; class SrsGbSipTcpReceiver;
class SrsLazyGbSipTcpSender; class SrsGbSipTcpSender;
class SrsAlonePithyPrint; class SrsAlonePithyPrint;
class SrsGbMuxer; class SrsGbMuxer;
class SrsSimpleRtmpClient; class SrsSimpleRtmpClient;
@ -51,7 +51,7 @@ class SrsRawAacStream;
// established: // established:
// init: media is not connected. // init: media is not connected.
// dispose session: sip is bye. // dispose session: sip is bye.
// Please see SrsLazyGbSession::drive_state for detail. // Please see SrsGbSession::drive_state for detail.
enum SrsGbSessionState enum SrsGbSessionState
{ {
SrsGbSessionStateInit = 0, SrsGbSessionStateInit = 0,
@ -76,7 +76,7 @@ std::string srs_gb_session_state(SrsGbSessionState state);
// to bye: Got bye SIP message from device. // to bye: Got bye SIP message from device.
// re-inviting: // re-inviting:
// to inviting: Got bye OK response from deivce. // to inviting: Got bye OK response from deivce.
// Please see SrsLazyGbSipTcpConn::drive_state for detail. // Please see SrsGbSipTcpConn::drive_state for detail.
enum SrsGbSipState enum SrsGbSipState
{ {
SrsGbSipStateInit = 0, SrsGbSipStateInit = 0,
@ -90,16 +90,23 @@ enum SrsGbSipState
std::string srs_gb_sip_state(SrsGbSipState state); std::string srs_gb_sip_state(SrsGbSipState state);
// The main logic object for GB, the session. // The main logic object for GB, the session.
class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler // Each session contains a SIP object and a media object, that are managed by session. This means session always
// lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word,
// SIP and media objects use directly pointer to session, while session use shared ptr.
class SrsGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler
{ {
private: private:
SrsCoroutine* trd_;
SrsContextId cid_; SrsContextId cid_;
private:
// The shared resource which own this object, we should never free it because it's managed by shared ptr.
SrsSharedResource<SrsGbSession>* wrapper_;
// The owner coroutine, allow user to interrupt the loop.
ISrsInterruptable* owner_coroutine_;
ISrsContextIdSetter* owner_cid_;
private: private:
SrsGbSessionState state_; SrsGbSessionState state_;
SrsLazyObjectWrapper<SrsLazyGbSession>* wrapper_root_; SrsSharedResource<SrsGbSipTcpConn> sip_;
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* sip_; SrsSharedResource<SrsGbMediaTcpConn> media_;
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* media_;
SrsGbMuxer* muxer_; SrsGbMuxer* muxer_;
private: private:
// The candidate for SDP in configuration. // The candidate for SDP in configuration.
@ -132,26 +139,27 @@ private:
uint64_t media_recovered_; uint64_t media_recovered_;
uint64_t media_msgs_dropped_; uint64_t media_msgs_dropped_;
uint64_t media_reserved_; uint64_t media_reserved_;
private:
friend class SrsLazyObjectWrapper<SrsLazyGbSession>;
SrsLazyGbSession(SrsLazyObjectWrapper<SrsLazyGbSession>* wrapper_root);
public: public:
virtual ~SrsLazyGbSession(); SrsGbSession();
virtual ~SrsGbSession();
public: public:
// Initialize the GB session. // Initialize the GB session.
srs_error_t initialize(SrsConfDirective* conf); void setup(SrsConfDirective* conf);
// Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
void setup_owner(SrsSharedResource<SrsGbSession>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
// Interface ISrsExecutorHandler
public:
virtual void on_executor_done(ISrsInterruptable* executor);
public:
// When got a pack of messages. // When got a pack of messages.
void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs); void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs);
// When got available SIP transport. // When got available SIP transport.
void on_sip_transport(SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* sip); void on_sip_transport(SrsSharedResource<SrsGbSipTcpConn> sip);
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* sip_transport(); SrsSharedResource<SrsGbSipTcpConn> sip_transport();
// When got available media transport. // When got available media transport.
void on_media_transport(SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* media); void on_media_transport(SrsSharedResource<SrsGbMediaTcpConn> media);
// Get the candidate for SDP generation, the public IP address for device to connect to. // Get the candidate for SDP generation, the public IP address for device to connect to.
std::string pip(); std::string pip();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler // Interface ISrsOneCycleThreadHandler
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
@ -186,12 +194,12 @@ public:
}; };
// A GB28181 TCP SIP connection. // A GB28181 TCP SIP connection.
class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler
{ {
private: private:
SrsGbSipState state_; SrsGbSipState state_;
SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* wrapper_root_; // The owner session object, note that we use the raw pointer and should never free it.
SrsLazyObjectWrapper<SrsLazyGbSession>* session_; SrsGbSession* session_;
SrsSipMessage* register_; SrsSipMessage* register_;
SrsSipMessage* invite_ok_; SrsSipMessage* invite_ok_;
private: private:
@ -202,18 +210,28 @@ private:
SrsTcpListener* sip_listener_; SrsTcpListener* sip_listener_;
SrsTcpListener* media_listener_; SrsTcpListener* media_listener_;
private: private:
SrsTcpConnection* conn_; // The shared resource which own this object, we should never free it because it's managed by shared ptr.
SrsLazyGbSipTcpReceiver* receiver_; SrsSharedResource<SrsGbSipTcpConn>* wrapper_;
SrsLazyGbSipTcpSender* sender_; // The owner coroutine, allow user to interrupt the loop.
SrsCoroutine* trd_; ISrsInterruptable* owner_coroutine_;
ISrsContextIdSetter* owner_cid_;
SrsContextId cid_;
private: private:
friend class SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>; SrsTcpConnection* conn_;
SrsLazyGbSipTcpConn(SrsLazyObjectWrapper<SrsLazyGbSipTcpConn>* wrapper_root); SrsGbSipTcpReceiver* receiver_;
SrsGbSipTcpSender* sender_;
public: public:
virtual ~SrsLazyGbSipTcpConn(); SrsGbSipTcpConn();
virtual ~SrsGbSipTcpConn();
public: public:
// Setup object, to keep empty constructor. // Setup object, to keep empty constructor.
void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd); void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd);
// Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
void setup_owner(SrsSharedResource<SrsGbSipTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
// Interface ISrsExecutorHandler
public:
virtual void on_executor_done(ISrsInterruptable* executor);
public:
// Get the SIP device id. // Get the SIP device id.
std::string device_id(); std::string device_id();
// Set the cid of all coroutines. // Set the cid of all coroutines.
@ -253,29 +271,26 @@ private:
public: public:
virtual const SrsContextId& get_id(); virtual const SrsContextId& get_id();
virtual std::string desc(); virtual std::string desc();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler // Interface ISrsOneCycleThreadHandler
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
private: private:
virtual srs_error_t do_cycle(); srs_error_t do_cycle();
private: private:
// Create session if no one, or bind to an existed session. // Create session if no one, or bind to an existed session.
srs_error_t bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper<SrsLazyGbSession>** psession); srs_error_t bind_session(SrsSipMessage* msg, SrsGbSession** psession);
}; };
// Start a coroutine to receive SIP messages. // Start a coroutine to receive SIP messages.
class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler
{ {
private: private:
SrsCoroutine* trd_; SrsCoroutine* trd_;
SrsTcpConnection* conn_; SrsTcpConnection* conn_;
SrsLazyGbSipTcpConn* sip_; SrsGbSipTcpConn* sip_;
public: public:
SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn); SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn);
virtual ~SrsLazyGbSipTcpReceiver(); virtual ~SrsGbSipTcpReceiver();
public: public:
// Interrupt the receiver coroutine. // Interrupt the receiver coroutine.
void interrupt(); void interrupt();
@ -292,7 +307,7 @@ private:
}; };
// Start a coroutine to send out SIP messages. // Start a coroutine to send out SIP messages.
class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler
{ {
private: private:
SrsCoroutine* trd_; SrsCoroutine* trd_;
@ -301,8 +316,8 @@ private:
std::vector<SrsSipMessage*> msgs_; std::vector<SrsSipMessage*> msgs_;
srs_cond_t wait_; srs_cond_t wait_;
public: public:
SrsLazyGbSipTcpSender(SrsTcpConnection* conn); SrsGbSipTcpSender(SrsTcpConnection* conn);
virtual ~SrsLazyGbSipTcpSender(); virtual ~SrsGbSipTcpSender();
public: public:
// Push message to queue, and sender will send out in dedicate coroutine. // Push message to queue, and sender will send out in dedicate coroutine.
void enqueue(SrsSipMessage* msg); void enqueue(SrsSipMessage* msg);
@ -333,27 +348,36 @@ public:
}; };
// A GB28181 TCP media connection, for PS stream. // A GB28181 TCP media connection, for PS stream.
class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler
, public ISrsPsPackHandler
{ {
private: private:
bool connected_; bool connected_;
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* wrapper_root_; // The owner session object, note that we use the raw pointer and should never free it.
SrsLazyObjectWrapper<SrsLazyGbSession>* session_; SrsGbSession* session_;
uint32_t nn_rtcp_; uint32_t nn_rtcp_;
private:
// The shared resource which own this object, we should never free it because it's managed by shared ptr.
SrsSharedResource<SrsGbMediaTcpConn>* wrapper_;
// The owner coroutine, allow user to interrupt the loop.
ISrsInterruptable* owner_coroutine_;
ISrsContextIdSetter* owner_cid_;
SrsContextId cid_;
private: private:
SrsPackContext* pack_; SrsPackContext* pack_;
SrsTcpConnection* conn_; SrsTcpConnection* conn_;
SrsCoroutine* trd_;
uint8_t* buffer_; uint8_t* buffer_;
private:
friend class SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>;
SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* wrapper_root);
public: public:
virtual ~SrsLazyGbMediaTcpConn(); SrsGbMediaTcpConn();
virtual ~SrsGbMediaTcpConn();
public: public:
// Setup object, to keep empty constructor. // Setup object, to keep empty constructor.
void setup(srs_netfd_t stfd); void setup(srs_netfd_t stfd);
// Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
void setup_owner(SrsSharedResource<SrsGbMediaTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
// Interface ISrsExecutorHandler
public:
virtual void on_executor_done(ISrsInterruptable* executor);
public:
// Whether media is connected. // Whether media is connected.
bool is_connected(); bool is_connected();
// Interrupt transport by session. // Interrupt transport by session.
@ -364,9 +388,6 @@ public:
public: public:
virtual const SrsContextId& get_id(); virtual const SrsContextId& get_id();
virtual std::string desc(); virtual std::string desc();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler // Interface ISrsOneCycleThreadHandler
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
@ -377,7 +398,7 @@ public:
virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs); virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector<SrsTsMessage*>& msgs);
private: private:
// Create session if no one, or bind to an existed session. // Create session if no one, or bind to an existed session.
srs_error_t bind_session(uint32_t ssrc, SrsLazyObjectWrapper<SrsLazyGbSession>** psession); srs_error_t bind_session(uint32_t ssrc, SrsGbSession** psession);
}; };
// The queue for mpegts over udp to send packets. // The queue for mpegts over udp to send packets.
@ -402,7 +423,8 @@ public:
class SrsGbMuxer class SrsGbMuxer
{ {
private: private:
SrsLazyGbSession* session_; // The owner session object, note that we use the raw pointer and should never free it.
SrsGbSession* session_;
std::string output_; std::string output_;
SrsSimpleRtmpClient* sdk_; SrsSimpleRtmpClient* sdk_;
private: private:
@ -428,10 +450,10 @@ private:
SrsMpegpsQueue* queue_; SrsMpegpsQueue* queue_;
SrsPithyPrint* pprint_; SrsPithyPrint* pprint_;
public: public:
SrsGbMuxer(SrsLazyGbSession* session); SrsGbMuxer(SrsGbSession* session);
virtual ~SrsGbMuxer(); virtual ~SrsGbMuxer();
public: public:
srs_error_t initialize(std::string output); void setup(std::string output);
srs_error_t on_ts_message(SrsTsMessage* msg); srs_error_t on_ts_message(SrsTsMessage* msg);
private: private:
virtual srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); virtual srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs);

View file

@ -1369,11 +1369,6 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg)
} }
#endif #endif
SrsLazySweepGc* gc = dynamic_cast<SrsLazySweepGc*>(_srs_gc);
if ((err = gc->start()) != srs_success) {
return srs_error_wrap(err, "start gc");
}
return err; return err;
} }

View file

@ -30,6 +30,30 @@ ISrsStartable::~ISrsStartable()
{ {
} }
ISrsInterruptable::ISrsInterruptable()
{
}
ISrsInterruptable::~ISrsInterruptable()
{
}
ISrsContextIdSetter::ISrsContextIdSetter()
{
}
ISrsContextIdSetter::~ISrsContextIdSetter()
{
}
ISrsContextIdGetter::ISrsContextIdGetter()
{
}
ISrsContextIdGetter::~ISrsContextIdGetter()
{
}
SrsCoroutine::SrsCoroutine() SrsCoroutine::SrsCoroutine()
{ {
} }
@ -342,3 +366,69 @@ void SrsWaitGroup::wait()
} }
} }
ISrsExecutorHandler::ISrsExecutorHandler()
{
}
ISrsExecutorHandler::~ISrsExecutorHandler()
{
}
SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb)
{
resource_ = r;
handler_ = h;
manager_ = m;
callback_ = cb;
trd_ = new SrsSTCoroutine("ar", this, resource_->get_id());
}
SrsExecutorCoroutine::~SrsExecutorCoroutine()
{
manager_->remove(resource_);
srs_freep(trd_);
}
srs_error_t SrsExecutorCoroutine::start()
{
return trd_->start();
}
void SrsExecutorCoroutine::interrupt()
{
trd_->interrupt();
}
srs_error_t SrsExecutorCoroutine::pull()
{
return trd_->pull();
}
const SrsContextId& SrsExecutorCoroutine::cid()
{
return trd_->cid();
}
void SrsExecutorCoroutine::set_cid(const SrsContextId& cid)
{
trd_->set_cid(cid);
}
srs_error_t SrsExecutorCoroutine::cycle()
{
srs_error_t err = handler_->cycle();
if (callback_) callback_->on_executor_done(this);
manager_->remove(this);
return err;
}
const SrsContextId& SrsExecutorCoroutine::get_id()
{
return resource_->get_id();
}
std::string SrsExecutorCoroutine::desc()
{
return resource_->desc();
}

View file

@ -15,8 +15,10 @@
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_protocol_st.hpp> #include <srs_protocol_st.hpp>
#include <srs_protocol_io.hpp> #include <srs_protocol_io.hpp>
#include <srs_protocol_conn.hpp>
class SrsFastCoroutine; class SrsFastCoroutine;
class SrsExecutorCoroutine;
// Each ST-coroutine must implements this interface, // Each ST-coroutine must implements this interface,
// to do the cycle job and handle some events. // to do the cycle job and handle some events.
@ -64,21 +66,46 @@ public:
virtual srs_error_t start() = 0; virtual srs_error_t start() = 0;
}; };
// The corotine object. // Allow user to interrupt the coroutine, for example, to stop it.
class SrsCoroutine : public ISrsStartable class ISrsInterruptable
{
public:
ISrsInterruptable();
virtual ~ISrsInterruptable();
public:
virtual void interrupt() = 0;
virtual srs_error_t pull() = 0;
};
// Get the context id.
class ISrsContextIdSetter
{
public:
ISrsContextIdSetter();
virtual ~ISrsContextIdSetter();
public:
virtual void set_cid(const SrsContextId& cid) = 0;
};
// Set the context id.
class ISrsContextIdGetter
{
public:
ISrsContextIdGetter();
virtual ~ISrsContextIdGetter();
public:
virtual const SrsContextId& cid() = 0;
};
// The coroutine object.
class SrsCoroutine : public ISrsStartable, public ISrsInterruptable
, public ISrsContextIdSetter, public ISrsContextIdGetter
{ {
public: public:
SrsCoroutine(); SrsCoroutine();
virtual ~SrsCoroutine(); virtual ~SrsCoroutine();
public: public:
virtual void stop() = 0; virtual void stop() = 0;
virtual void interrupt() = 0;
// @return a copy of error, which should be freed by user.
// NULL if not terminated and user should pull again.
virtual srs_error_t pull() = 0;
// Get and set the context id of coroutine.
virtual const SrsContextId& cid() = 0;
virtual void set_cid(const SrsContextId& cid) = 0;
}; };
// An empty coroutine, user can default to this object before create any real coroutine. // An empty coroutine, user can default to this object before create any real coroutine.
@ -192,7 +219,7 @@ private:
static void* pfn(void* arg); static void* pfn(void* arg);
}; };
// Like goroytine sync.WaitGroup. // Like goroutine sync.WaitGroup.
class SrsWaitGroup class SrsWaitGroup
{ {
private: private:
@ -206,9 +233,72 @@ public:
void add(int n); void add(int n);
// When coroutine is done. // When coroutine is done.
void done(); void done();
// Wait for all corotine to be done. // Wait for all coroutine to be done.
void wait(); void wait();
}; };
// The callback when executor cycle done.
class ISrsExecutorHandler
{
public:
ISrsExecutorHandler();
virtual ~ISrsExecutorHandler();
public:
virtual void on_executor_done(ISrsInterruptable* executor) = 0;
};
// Start a coroutine for resource executor, to execute the handler and delete resource and itself when
// handler cycle done.
//
// Note that the executor will free itself by manager, then free the resource by manager. This is a helper
// that is used for a goroutine to execute a handler and free itself after the cycle is done.
//
// Generally, the handler, resource, and callback generally are the same object. But we do not define a single
// interface, because shared resource is a special interface.
//
// Note that the resource may live longer than executor, because it is shared resource, so you should process
// the callback. For example, you should never use the executor after it's stopped and deleted.
//
// Usage:
// ISrsResourceManager* manager = ...;
// ISrsResource* resource, ISrsCoroutineHandler* handler, ISrsExecutorHandler* callback = ...; // Resource, handler, and callback are the same object.
// SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(manager, resource, handler);
// if ((err = executor->start()) != srs_success) {
// srs_freep(executor);
// return err;
// }
class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsInterruptable
, public ISrsContextIdSetter, public ISrsContextIdGetter, public ISrsCoroutineHandler
{
private:
ISrsResourceManager* manager_;
ISrsResource* resource_;
ISrsCoroutineHandler* handler_;
ISrsExecutorHandler* callback_;
private:
SrsCoroutine* trd_;
public:
SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb);
virtual ~SrsExecutorCoroutine();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsInterruptable
public:
virtual void interrupt();
virtual srs_error_t pull();
// Interface ISrsContextId
public:
virtual const SrsContextId& cid();
virtual void set_cid(const SrsContextId& cid);
// Interface ISrsOneCycleThreadHandler
public:
virtual srs_error_t cycle();
// Interface ISrsResource
public:
virtual const SrsContextId& get_id();
virtual std::string desc();
};
#endif #endif

View file

@ -335,7 +335,6 @@ srs_error_t srs_global_initialize()
#ifdef SRS_GB28181 #ifdef SRS_GB28181
_srs_gb_manager = new SrsResourceManager("GB", true); _srs_gb_manager = new SrsResourceManager("GB", true);
#endif #endif
_srs_gc = new SrsLazySweepGc();
// Initialize global pps, which depends on _srs_clock // Initialize global pps, which depends on _srs_clock
_srs_pps_ids = new SrsPps(); _srs_pps_ids = new SrsPps();

View file

@ -81,4 +81,103 @@ public:
} }
}; };
// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107
// Usage:
// SrsSharedPtr<MyClass> ptr(new MyClass());
// ptr->do_something();
//
// SrsSharedPtr<MyClass> cp = ptr;
// cp->do_something();
template<class T>
class SrsSharedPtr
{
private:
// The pointer to the object.
T* ptr_;
// The reference count of the object.
uint32_t* ref_count_;
public:
// Create a shared ptr with the object.
SrsSharedPtr(T* ptr) {
ptr_ = ptr;
ref_count_ = new uint32_t(1);
}
// Copy the shared ptr.
SrsSharedPtr(const SrsSharedPtr<T>& cp) {
copy(cp);
}
// Dispose and delete the shared ptr.
virtual ~SrsSharedPtr() {
reset();
}
private:
// Reset the shared ptr.
void reset() {
if (!ref_count_) return;
(*ref_count_)--;
if (*ref_count_ == 0) {
delete ptr_;
delete ref_count_;
}
ptr_ = NULL;
ref_count_ = NULL;
}
// Copy from other shared ptr.
void copy(const SrsSharedPtr<T>& cp) {
ptr_ = cp.ptr_;
ref_count_ = cp.ref_count_;
if (ref_count_) (*ref_count_)++;
}
// Move from other shared ptr.
void move(SrsSharedPtr<T>& cp) {
ptr_ = cp.ptr_;
ref_count_ = cp.ref_count_;
cp.ptr_ = NULL;
cp.ref_count_ = NULL;
}
public:
// Get the object.
T* get() {
return ptr_;
}
// Overload the -> operator.
T* operator->() {
return ptr_;
}
// The assign operator.
SrsSharedPtr<T>& operator=(const SrsSharedPtr<T>& cp) {
if (this != &cp) {
reset();
copy(cp);
}
return *this;
}
private:
// Overload the * operator.
T& operator*() {
return *ptr_;
}
// Overload the bool operator.
operator bool() const {
return ptr_ != NULL;
}
#if __cplusplus >= 201103L // C++11
public:
// The move constructor.
SrsSharedPtr(SrsSharedPtr<T>&& cp) {
move(cp);
};
// The move assign operator.
SrsSharedPtr<T>& operator=(SrsSharedPtr<T>&& cp) {
if (this != &cp) {
reset();
move(cp);
}
return *this;
};
#endif
};
#endif #endif

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 6 #define VERSION_MAJOR 6
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 125 #define VERSION_REVISION 126
#endif #endif

View file

@ -40,35 +40,3 @@ ISrsConnection::~ISrsConnection()
{ {
} }
SrsLazyObject::SrsLazyObject()
{
gc_ref_ = 0;
}
SrsLazyObject::~SrsLazyObject()
{
}
void SrsLazyObject::gc_use()
{
gc_ref_++;
}
void SrsLazyObject::gc_dispose()
{
gc_ref_--;
}
int32_t SrsLazyObject::gc_ref()
{
return gc_ref_;
}
ISrsLazyGc::ISrsLazyGc()
{
}
ISrsLazyGc::~ISrsLazyGc()
{
}

View file

@ -33,7 +33,9 @@ public:
ISrsResourceManager(); ISrsResourceManager();
virtual ~ISrsResourceManager(); virtual ~ISrsResourceManager();
public: public:
// Remove then free the specified connection. // Remove then free the specified connection. Note that the manager always free c resource,
// in the same coroutine or another coroutine. Some manager may support add c to a map, it
// should always free it even if it's in the map.
virtual void remove(ISrsResource* c) = 0; virtual void remove(ISrsResource* c) = 0;
}; };
@ -48,36 +50,5 @@ public:
virtual std::string remote_ip() = 0; virtual std::string remote_ip() = 0;
}; };
// Lazy-sweep resource, never sweep util all wrappers are freed.
// See https://github.com/ossrs/srs/issues/3176#lazy-sweep
class SrsLazyObject
{
private:
// The reference count of resource, 0 is no wrapper and safe to sweep.
int32_t gc_ref_;
public:
SrsLazyObject();
virtual ~SrsLazyObject();
public:
// For wrapper to use this resource.
virtual void gc_use();
// For wrapper to dispose this resource.
virtual void gc_dispose();
// The current reference count of resource.
virtual int32_t gc_ref();
};
// The lazy-sweep GC, wait for a long time to dispose resource even when resource is disposable.
// See https://github.com/ossrs/srs/issues/3176#lazy-sweep
class ISrsLazyGc
{
public:
ISrsLazyGc();
virtual ~ISrsLazyGc();
public:
// Remove then free the specified resource.
virtual void remove(SrsLazyObject* c) = 0;
};
#endif #endif

View file

@ -8,6 +8,8 @@
using namespace std; using namespace std;
#include <srs_core_autofree.hpp> #include <srs_core_autofree.hpp>
#include <srs_protocol_conn.hpp>
#include <srs_app_conn.hpp>
VOID TEST(CoreAutoFreeTest, Free) VOID TEST(CoreAutoFreeTest, Free)
{ {
@ -86,3 +88,343 @@ VOID TEST(CoreLogger, CheckVsnprintf)
} }
} }
VOID TEST(CoreLogger, SharedPtrTypical)
{
if (true) {
SrsSharedPtr<int> p(new int(100));
EXPECT_TRUE(p);
EXPECT_EQ(100, *p);
}
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q = p;
EXPECT_EQ(p.get(), q.get());
}
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q(p);
EXPECT_EQ(p.get(), q.get());
}
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q = p;
EXPECT_TRUE(p);
EXPECT_TRUE(q);
EXPECT_EQ(100, *p);
EXPECT_EQ(100, *q);
}
}
VOID TEST(CoreLogger, SharedPtrReset)
{
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q = p;
p.reset();
EXPECT_FALSE(p);
EXPECT_TRUE(q);
EXPECT_EQ(100, *q);
}
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q = p;
q.reset();
EXPECT_TRUE(p);
EXPECT_FALSE(q);
EXPECT_EQ(100, *p);
}
}
VOID TEST(CoreLogger, SharedPtrObject)
{
SrsSharedPtr<MyNormalObject> p(new MyNormalObject(100));
EXPECT_TRUE(p);
EXPECT_EQ(100, p->id());
}
VOID TEST(CoreLogger, SharedPtrNullptr)
{
SrsSharedPtr<int> p(NULL);
EXPECT_FALSE(p);
p.reset();
EXPECT_FALSE(p);
SrsSharedPtr<int> q = p;
EXPECT_FALSE(q);
}
class MockWrapper
{
public:
int* ptr;
public:
MockWrapper(int* p) {
ptr = p;
*ptr = *ptr + 1;
}
~MockWrapper() {
*ptr = *ptr - 1;
}
};
VOID TEST(CoreLogger, SharedPtrWrapper)
{
int* ptr = new int(100);
SrsAutoFree(int, ptr);
EXPECT_EQ(100, *ptr);
if (true) {
SrsSharedPtr<MockWrapper> p(new MockWrapper(ptr));
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *p->ptr);
SrsSharedPtr<MockWrapper> q = p;
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *p->ptr);
EXPECT_EQ(101, *q->ptr);
SrsSharedPtr<MockWrapper> r(new MockWrapper(ptr));
EXPECT_EQ(102, *ptr);
EXPECT_EQ(102, *p->ptr);
EXPECT_EQ(102, *q->ptr);
EXPECT_EQ(102, *r->ptr);
SrsSharedPtr<MockWrapper> s(new MockWrapper(ptr));
EXPECT_EQ(103, *ptr);
EXPECT_EQ(103, *p->ptr);
EXPECT_EQ(103, *q->ptr);
EXPECT_EQ(103, *r->ptr);
EXPECT_EQ(103, *s->ptr);
}
EXPECT_EQ(100, *ptr);
if (true) {
SrsSharedPtr<MockWrapper> p(new MockWrapper(ptr));
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *p->ptr);
}
EXPECT_EQ(100, *ptr);
}
VOID TEST(CoreLogger, SharedPtrAssign)
{
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q(NULL);
q = p;
EXPECT_EQ(p.get(), q.get());
}
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q(new int(101));
int* q0 = q.get();
q = p;
EXPECT_EQ(p.get(), q.get());
EXPECT_NE(q0, q.get());
}
int* ptr0 = new int(100);
SrsAutoFree(int, ptr0);
EXPECT_EQ(100, *ptr0);
int* ptr1 = new int(200);
SrsAutoFree(int, ptr1);
EXPECT_EQ(200, *ptr1);
if (true) {
SrsSharedPtr<MockWrapper> p(new MockWrapper(ptr0));
EXPECT_EQ(101, *ptr0);
EXPECT_EQ(101, *p->ptr);
SrsSharedPtr<MockWrapper> q(new MockWrapper(ptr1));
EXPECT_EQ(201, *ptr1);
EXPECT_EQ(201, *q->ptr);
q = p;
EXPECT_EQ(200, *ptr1);
EXPECT_EQ(101, *ptr0);
EXPECT_EQ(101, *p->ptr);
EXPECT_EQ(101, *q->ptr);
}
EXPECT_EQ(100, *ptr0);
EXPECT_EQ(200, *ptr1);
}
template<typename T>
SrsSharedPtr<T> mock_shared_ptr_move_assign(SrsSharedPtr<T> p) {
SrsSharedPtr<T> q = p;
return q;
}
template<typename T>
SrsSharedPtr<T> mock_shared_ptr_move_ctr(SrsSharedPtr<T> p) {
return p;
}
VOID TEST(CoreLogger, SharedPtrMove)
{
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q(new int(101));
q = mock_shared_ptr_move_ctr(p);
EXPECT_EQ(q.get(), p.get());
}
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q(new int(101));
q = mock_shared_ptr_move_assign(p);
EXPECT_EQ(q.get(), p.get());
}
int* ptr = new int(100);
SrsAutoFree(int, ptr);
EXPECT_EQ(100, *ptr);
if (true) {
SrsSharedPtr<MockWrapper> p(new MockWrapper(ptr));
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *p->ptr);
SrsSharedPtr<MockWrapper> q(new MockWrapper(ptr));
q = mock_shared_ptr_move_ctr(p);
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *q->ptr);
}
EXPECT_EQ(100, *ptr);
if (true) {
SrsSharedPtr<MockWrapper> p(new MockWrapper(ptr));
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *p->ptr);
SrsSharedPtr<MockWrapper> q(new MockWrapper(ptr));
q = mock_shared_ptr_move_assign(p);
EXPECT_EQ(101, *ptr);
EXPECT_EQ(101, *q->ptr);
}
EXPECT_EQ(100, *ptr);
// Note that this will not trigger the move constructor or move assignment operator.
if (true) {
SrsSharedPtr<int> p(new int(100));
SrsSharedPtr<int> q = mock_shared_ptr_move_assign(p);
EXPECT_EQ(q.get(), p.get());
}
// Note that this will not trigger the move constructor or move assignment operator.
if (true) {
SrsSharedPtr<int> p = SrsSharedPtr<int>(new int(100));
EXPECT_TRUE(p);
EXPECT_EQ(100, *p);
}
}
class MockIntResource : public ISrsResource
{
public:
SrsContextId id_;
int value_;
public:
MockIntResource(int value) : value_(value) {
}
virtual ~MockIntResource() {
}
public:
virtual const SrsContextId& get_id() {
return id_;
}
virtual std::string desc() {
return id_.c_str();
}
};
VOID TEST(CoreLogger, SharedResourceTypical)
{
if (true) {
SrsSharedResource<MockIntResource>* p = new SrsSharedResource<MockIntResource>(new MockIntResource(100));
EXPECT_TRUE(*p);
EXPECT_EQ(100, (*p)->value_);
srs_freep(p);
}
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
EXPECT_TRUE(p);
EXPECT_EQ(100, p->value_);
}
if (true) {
SrsSharedResource<MockIntResource> p = SrsSharedResource<MockIntResource>(new MockIntResource(100));
EXPECT_TRUE(p);
EXPECT_EQ(100, p->value_);
}
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
SrsSharedResource<MockIntResource> q = p;
EXPECT_EQ(p.get(), q.get());
}
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
SrsSharedResource<MockIntResource> q(NULL);
q = p;
EXPECT_EQ(p.get(), q.get());
}
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
SrsSharedResource<MockIntResource> q(new MockIntResource(200));
q = p;
EXPECT_EQ(p.get(), q.get());
}
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
SrsSharedResource<MockIntResource> q = p;
EXPECT_TRUE(p);
EXPECT_TRUE(q);
EXPECT_EQ(100, p->value_);
EXPECT_EQ(100, q->value_);
}
}
template<typename T>
SrsSharedResource<T> mock_shared_resource_move_assign(SrsSharedResource<T> p) {
SrsSharedResource<T> q = p;
return q;
}
template<typename T>
SrsSharedResource<T> mock_shared_resource_move_ctr(SrsSharedResource<T> p) {
return p;
}
VOID TEST(CoreLogger, SharedResourceMove)
{
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
SrsSharedResource<MockIntResource> q(new MockIntResource(101));
q = mock_shared_resource_move_ctr(p);
EXPECT_EQ(100, q->value_);
EXPECT_EQ(q.get(), p.get());
}
if (true) {
SrsSharedResource<MockIntResource> p(new MockIntResource(100));
SrsSharedResource<MockIntResource> q(new MockIntResource(101));
q = mock_shared_resource_move_assign(p);
EXPECT_EQ(100, q->value_);
EXPECT_EQ(q.get(), p.get());
}
}

View file

@ -14,5 +14,18 @@
#include <string> #include <string>
class MyNormalObject
{
private:
int id_;
public:
MyNormalObject(int id) {
id_ = id;
}
int id() {
return id_;
}
};
#endif #endif