From 776f24cf3de45297f2d946f29da5efeb0480e420 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 2 Oct 2020 09:13:41 +0800 Subject: [PATCH] Refine resource manager, ignore unsubscribed handler --- trunk/src/app/srs_app_conn.cpp | 54 ++++++++- trunk/src/app/srs_app_conn.hpp | 5 + trunk/src/app/srs_app_log.hpp | 3 +- trunk/src/utest/srs_utest_rtc.cpp | 180 ++++++++++++++++++++++++++---- 4 files changed, 214 insertions(+), 28 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 720ef21f8..34c12c3be 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -32,6 +32,7 @@ using namespace std; #include #include #include +#include ISrsDisposingHandler::ISrsDisposingHandler() { @@ -48,6 +49,7 @@ SrsResourceManager::SrsResourceManager(const std::string& label, bool verbose) cond = srs_cond_new(); trd = NULL; p_disposing_ = NULL; + removing_ = false; } SrsResourceManager::~SrsResourceManager() @@ -151,6 +153,12 @@ void SrsResourceManager::subscribe(ISrsDisposingHandler* h) if (std::find(handlers_.begin(), handlers_.end(), h) == handlers_.end()) { handlers_.push_back(h); } + + // Restore the handler from unsubscribing handlers. + vector::iterator it; + if ((it = std::find(unsubs_.begin(), unsubs_.end(), h)) != unsubs_.end()) { + unsubs_.erase(it); + } } void SrsResourceManager::unsubscribe(ISrsDisposingHandler* h) @@ -159,9 +167,21 @@ void SrsResourceManager::unsubscribe(ISrsDisposingHandler* h) if (it != handlers_.end()) { handlers_.erase(it); } + + // Put it to the unsubscribing handlers. + if (std::find(unsubs_.begin(), unsubs_.end(), h) == unsubs_.end()) { + unsubs_.push_back(h); + } } void SrsResourceManager::remove(ISrsResource* c) +{ + removing_ = true; + do_remove(c); + removing_ = false; +} + +void SrsResourceManager::do_remove(ISrsResource* c) { SrsContextRestore(cid_); if (verbose_) { @@ -186,9 +206,19 @@ void SrsResourceManager::remove(ISrsResource* c) // Push to zombies, we will free it in another coroutine. zombies_.push_back(c); + // We should copy all handlers, because it may change during callback. + vector handlers = handlers_; + // Notify other handlers to handle the before-dispose event. - for (int i = 0; i < (int)handlers_.size(); i++) { - ISrsDisposingHandler* h = handlers_.at(i); + for (int i = 0; i < (int)handlers.size(); i++) { + ISrsDisposingHandler* h = handlers.at(i); + + // Ignore if handler is unsubscribing. + if (!unsubs_.empty() && std::find(unsubs_.begin(), unsubs_.end(), h) != unsubs_.end()) { + srs_warn2(TAG_RESOURCE_UNSUB, "ignore before-dispose for %p", h); + continue; + } + h->on_before_dispose(c); } @@ -207,6 +237,11 @@ void SrsResourceManager::clear() srs_trace("clear zombies=%d connections", (int)zombies_.size()); } + // Clear all unsubscribing handlers, if not removing any resource. + if (!removing_ && !unsubs_.empty()) { + vector().swap(unsubs_); + } + do_clear(); // Reset it for it points to a local object. @@ -260,8 +295,19 @@ void SrsResourceManager::dispose(ISrsResource* c) conns_.erase(it); } - for (int i = 0; i < (int)handlers_.size(); i++) { - ISrsDisposingHandler* h = handlers_.at(i); + // We should copy all handlers, because it may change during callback. + vector handlers = handlers_; + + // Notify other handlers to handle the disposing event. + for (int i = 0; i < (int)handlers.size(); i++) { + ISrsDisposingHandler* h = handlers.at(i); + + // Ignore if handler is unsubscribing. + if (!unsubs_.empty() && std::find(unsubs_.begin(), unsubs_.end(), h) != unsubs_.end()) { + srs_warn2(TAG_RESOURCE_UNSUB, "ignore disposing for %p", h); + continue; + } + h->on_disposing(c); } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index f9706ef8d..7f8fd1748 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -62,6 +62,10 @@ private: srs_cond_t cond; // Callback handlers. std::vector handlers_; + // Unsubscribing handlers, skip it for notifying. + std::vector unsubs_; + // Whether we are removing resources. + bool removing_; // The zombie connections, we will delete it asynchronously. std::vector zombies_; std::vector* p_disposing_; @@ -96,6 +100,7 @@ public: public: virtual void remove(ISrsResource* c); private: + void do_remove(ISrsResource* c); void clear(); void do_clear(); void dispose(ISrsResource* c); diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index f09185c45..70d58e2e5 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -35,7 +35,8 @@ // For log TAGs. #define TAG_MAIN "MAIN" #define TAG_MAYBE "MAYBE" -#define TAG_DTLS_ALERT "DTLSALERT" +#define TAG_DTLS_ALERT "DTLS_ALERT" +#define TAG_RESOURCE_UNSUB "RESOURCE_UNSUB" // Use memory/disk cache and donot flush when write log. // it's ok to use it without config, which will log to console, and default trace level. diff --git a/trunk/src/utest/srs_utest_rtc.cpp b/trunk/src/utest/srs_utest_rtc.cpp index 40fedaada..cb8d5d441 100644 --- a/trunk/src/utest/srs_utest_rtc.cpp +++ b/trunk/src/utest/srs_utest_rtc.cpp @@ -37,25 +37,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include using namespace std; -class MockResourceHookOwner : public ISrsResource, public ISrsDisposingHandler +class MockResource : public ISrsDisposingHandler, public ISrsResource { public: - ISrsResource* owner_; SrsResourceManager* manager_; - MockResourceHookOwner(SrsResourceManager* manager) { + MockResource(SrsResourceManager* manager) { manager_ = manager; - owner_ = NULL; - manager_->subscribe(this); - } - virtual ~MockResourceHookOwner() { - manager_->unsubscribe(this); - } - virtual void on_before_dispose(ISrsResource* c) { - if (c == owner_) { // Remove self if its owner is disposing. - manager_->remove(this); + if (manager_) { + manager_->subscribe(this); } } - virtual void on_disposing(ISrsResource* c) { + virtual ~MockResource() { + if (manager_) { + manager_->unsubscribe(this); + } } virtual const SrsContextId& get_id() { return _srs_context->get_id(); @@ -65,19 +60,33 @@ public: } }; -class MockResourceSelf : public ISrsResource, public ISrsDisposingHandler +class MockResourceHookOwner : public MockResource +{ +public: + ISrsResource* owner_; + MockResourceHookOwner(SrsResourceManager* manager) : MockResource(manager) { + owner_ = NULL; + } + virtual ~MockResourceHookOwner() { + } + virtual void on_before_dispose(ISrsResource* c) { + if (c == owner_) { // Remove self if its owner is disposing. + manager_->remove(this); + } + } + virtual void on_disposing(ISrsResource* c) { + } +}; + +class MockResourceSelf : public MockResource { public: bool remove_in_before_dispose; bool remove_in_disposing; - SrsResourceManager* manager_; - MockResourceSelf(SrsResourceManager* manager) { + MockResourceSelf(SrsResourceManager* manager) : MockResource(manager) { remove_in_before_dispose = remove_in_disposing = false; - manager_ = manager; - manager_->subscribe(this); } virtual ~MockResourceSelf() { - manager_->unsubscribe(this); } virtual void on_before_dispose(ISrsResource* c) { if (remove_in_before_dispose) { @@ -89,11 +98,37 @@ public: manager_->remove(this); } } - virtual const SrsContextId& get_id() { - return _srs_context->get_id(); +}; + +class MockResourceUnsubscribe : public MockResource +{ +public: + int nn_before_dispose; + int nn_disposing; + bool unsubscribe_in_before_dispose; + bool unsubscribe_in_disposing; + MockResourceUnsubscribe* result; + MockResourceUnsubscribe(SrsResourceManager* manager) : MockResource(manager) { + unsubscribe_in_before_dispose = unsubscribe_in_disposing = false; + nn_before_dispose = nn_disposing = 0; + result = NULL; } - virtual std::string desc() { - return ""; + virtual ~MockResourceUnsubscribe() { + if (result) { // Copy result before disposing it. + *result = *this; + } + } + virtual void on_before_dispose(ISrsResource* c) { + nn_before_dispose++; + if (unsubscribe_in_before_dispose) { + manager_->unsubscribe(this); + } + } + virtual void on_disposing(ISrsResource* c) { + nn_disposing++; + if (unsubscribe_in_disposing) { + manager_->unsubscribe(this); + } } }; @@ -101,6 +136,105 @@ VOID TEST(KernelRTCTest, ConnectionManagerTest) { srs_error_t err = srs_success; + // When notifying, the handlers changed, disposing event may lost. + if (true) { + SrsResourceManager manager("mgr"); + HELPER_EXPECT_SUCCESS(manager.start()); + EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty()); + + MockResourceUnsubscribe* conn0 = new MockResourceUnsubscribe(&manager); + conn0->unsubscribe_in_disposing = true; + manager.add(conn0); + + MockResourceUnsubscribe* conn1 = new MockResourceUnsubscribe(&manager); + manager.add(conn1); + + MockResourceUnsubscribe* conn2 = new MockResourceUnsubscribe(&manager); + manager.add(conn2); + + // When removing conn0, it will unsubscribe and change the handlers, + // which should not cause the conn1 lost event. + manager.remove(conn0); + srs_usleep(0); + ASSERT_EQ(2, manager.size()); + + EXPECT_EQ(1, conn1->nn_before_dispose); + EXPECT_EQ(1, conn1->nn_disposing); // Should get event. + + EXPECT_EQ(1, conn2->nn_before_dispose); + EXPECT_EQ(1, conn2->nn_disposing); + } + + // When notifying, the handlers changed, before-dispose event may lost. + if (true) { + SrsResourceManager manager("mgr"); + HELPER_EXPECT_SUCCESS(manager.start()); + EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty()); + + MockResourceUnsubscribe* conn0 = new MockResourceUnsubscribe(&manager); + conn0->unsubscribe_in_before_dispose = true; + manager.add(conn0); + + MockResourceUnsubscribe* conn1 = new MockResourceUnsubscribe(&manager); + manager.add(conn1); + + MockResourceUnsubscribe* conn2 = new MockResourceUnsubscribe(&manager); + manager.add(conn2); + + // When removing conn0, it will unsubscribe and change the handlers, + // which should not cause the conn1 lost event. + manager.remove(conn0); + srs_usleep(0); + ASSERT_EQ(2, manager.size()); + + EXPECT_EQ(1, conn1->nn_before_dispose); // Should get event. + EXPECT_EQ(1, conn1->nn_disposing); + + EXPECT_EQ(1, conn2->nn_before_dispose); + EXPECT_EQ(1, conn2->nn_disposing); + } + + // Subscribe or unsubscribe for multiple times. + if (true) { + SrsResourceManager manager("mgr"); + HELPER_EXPECT_SUCCESS(manager.start()); + EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty()); + + MockResourceUnsubscribe* resource = new MockResourceUnsubscribe(&manager); + resource->unsubscribe_in_before_dispose = true; + manager.add(resource); + + MockResourceUnsubscribe result(NULL); // No manager for result. + resource->result = &result; + + manager.remove(resource); + srs_usleep(0); + ASSERT_EQ(0, manager.size()); + + EXPECT_EQ(1, result.nn_before_dispose); + EXPECT_EQ(0, result.nn_disposing); // No disposing event, because we unsubscribe in before-dispose. + } + + // Count the event for disposing. + if (true) { + SrsResourceManager manager("mgr"); + HELPER_EXPECT_SUCCESS(manager.start()); + EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty()); + + MockResourceUnsubscribe* resource = new MockResourceUnsubscribe(&manager); + manager.add(resource); + + MockResourceUnsubscribe result(NULL); // No manager for result. + resource->result = &result; + + manager.remove(resource); + srs_usleep(0); + ASSERT_EQ(0, manager.size()); + + EXPECT_EQ(1, result.nn_before_dispose); + EXPECT_EQ(1, result.nn_disposing); + } + // When hooks disposing, remove itself again. if (true) { SrsResourceManager manager("mgr");