mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Refine resource manager, fix loop and context switching bug
This commit is contained in:
parent
033e2f9210
commit
1a33452e95
3 changed files with 93 additions and 2 deletions
|
@ -47,6 +47,7 @@ SrsResourceManager::SrsResourceManager(const std::string& label, bool verbose)
|
||||||
label_ = label;
|
label_ = label;
|
||||||
cond = srs_cond_new();
|
cond = srs_cond_new();
|
||||||
trd = NULL;
|
trd = NULL;
|
||||||
|
p_disposing_ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsResourceManager::~SrsResourceManager()
|
SrsResourceManager::~SrsResourceManager()
|
||||||
|
@ -168,15 +169,30 @@ void SrsResourceManager::remove(ISrsResource* c)
|
||||||
srs_trace("before dispose resource(%s), zombies=%d", c->desc().c_str(), (int)zombies_.size());
|
srs_trace("before dispose resource(%s), zombies=%d", c->desc().c_str(), (int)zombies_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (std::find(zombies_.begin(), zombies_.end(), c) == zombies_.end()) {
|
// Only notify when not removed(in zombies_).
|
||||||
zombies_.push_back(c);
|
vector<ISrsResource*>::iterator it = std::find(zombies_.begin(), zombies_.end(), c);
|
||||||
|
if (it != zombies_.end()) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Also ignore when we are disposing it.
|
||||||
|
if (p_disposing_) {
|
||||||
|
it = std::find(p_disposing_->begin(), p_disposing_->end(), c);
|
||||||
|
if (it != p_disposing_->end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push to zombies, we will free it in another coroutine.
|
||||||
|
zombies_.push_back(c);
|
||||||
|
|
||||||
|
// Notify other handlers to handle the before-dispose event.
|
||||||
for (int i = 0; i < (int)handlers_.size(); i++) {
|
for (int i = 0; i < (int)handlers_.size(); i++) {
|
||||||
ISrsDisposingHandler* h = handlers_.at(i);
|
ISrsDisposingHandler* h = handlers_.at(i);
|
||||||
h->on_before_dispose(c);
|
h->on_before_dispose(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify the coroutine to free it.
|
||||||
srs_cond_signal(cond);
|
srs_cond_signal(cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,10 +207,19 @@ void SrsResourceManager::clear()
|
||||||
srs_trace("clear zombies=%d connections", (int)zombies_.size());
|
srs_trace("clear zombies=%d connections", (int)zombies_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
do_clear();
|
||||||
|
|
||||||
|
// Reset it for it points to a local object.
|
||||||
|
p_disposing_ = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsResourceManager::do_clear()
|
||||||
|
{
|
||||||
// To prevent thread switch when delete connection,
|
// To prevent thread switch when delete connection,
|
||||||
// we copy all connections then free one by one.
|
// we copy all connections then free one by one.
|
||||||
vector<ISrsResource*> copy;
|
vector<ISrsResource*> copy;
|
||||||
copy.swap(zombies_);
|
copy.swap(zombies_);
|
||||||
|
p_disposing_ = ©
|
||||||
|
|
||||||
vector<ISrsResource*>::iterator it;
|
vector<ISrsResource*>::iterator it;
|
||||||
for (it = copy.begin(); it != copy.end(); ++it) {
|
for (it = copy.begin(); it != copy.end(); ++it) {
|
||||||
|
|
|
@ -64,6 +64,7 @@ private:
|
||||||
std::vector<ISrsDisposingHandler*> handlers_;
|
std::vector<ISrsDisposingHandler*> handlers_;
|
||||||
// The zombie connections, we will delete it asynchronously.
|
// The zombie connections, we will delete it asynchronously.
|
||||||
std::vector<ISrsResource*> zombies_;
|
std::vector<ISrsResource*> zombies_;
|
||||||
|
std::vector<ISrsResource*>* p_disposing_;
|
||||||
private:
|
private:
|
||||||
// The connections without any id.
|
// The connections without any id.
|
||||||
std::vector<ISrsResource*> conns_;
|
std::vector<ISrsResource*> conns_;
|
||||||
|
@ -96,6 +97,7 @@ public:
|
||||||
virtual void remove(ISrsResource* c);
|
virtual void remove(ISrsResource* c);
|
||||||
private:
|
private:
|
||||||
void clear();
|
void clear();
|
||||||
|
void do_clear();
|
||||||
void dispose(ISrsResource* c);
|
void dispose(ISrsResource* c);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -65,10 +65,74 @@ public:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class MockResourceSelf : public ISrsResource, public ISrsDisposingHandler
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
bool remove_in_before_dispose;
|
||||||
|
bool remove_in_disposing;
|
||||||
|
SrsResourceManager* manager_;
|
||||||
|
MockResourceSelf(SrsResourceManager* manager) {
|
||||||
|
remove_in_before_dispose = remove_in_disposing = false;
|
||||||
|
manager_ = manager;
|
||||||
|
manager_->subscribe(this);
|
||||||
|
}
|
||||||
|
virtual ~MockResourceSelf() {
|
||||||
|
manager_->unsubscribe(this);
|
||||||
|
}
|
||||||
|
virtual void on_before_dispose(ISrsResource* c) {
|
||||||
|
if (remove_in_before_dispose) {
|
||||||
|
manager_->remove(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
virtual void on_disposing(ISrsResource* c) {
|
||||||
|
if (remove_in_disposing) {
|
||||||
|
manager_->remove(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
virtual const SrsContextId& get_id() {
|
||||||
|
return _srs_context->get_id();
|
||||||
|
}
|
||||||
|
virtual std::string desc() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
VOID TEST(KernelRTCTest, ConnectionManagerTest)
|
VOID TEST(KernelRTCTest, ConnectionManagerTest)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
// When hooks disposing, remove itself again.
|
||||||
|
if (true) {
|
||||||
|
SrsResourceManager manager("mgr");
|
||||||
|
HELPER_EXPECT_SUCCESS(manager.start());
|
||||||
|
EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty());
|
||||||
|
|
||||||
|
MockResourceSelf* resource = new MockResourceSelf(&manager);
|
||||||
|
resource->remove_in_disposing = true;
|
||||||
|
manager.add(resource);
|
||||||
|
EXPECT_EQ(1, manager.size());
|
||||||
|
|
||||||
|
manager.remove(resource);
|
||||||
|
srs_usleep(0);
|
||||||
|
ASSERT_EQ(0, manager.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// When hooks before-dispose, remove itself again.
|
||||||
|
if (true) {
|
||||||
|
SrsResourceManager manager("mgr");
|
||||||
|
HELPER_EXPECT_SUCCESS(manager.start());
|
||||||
|
EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty());
|
||||||
|
|
||||||
|
MockResourceSelf* resource = new MockResourceSelf(&manager);
|
||||||
|
resource->remove_in_before_dispose = true;
|
||||||
|
manager.add(resource);
|
||||||
|
EXPECT_EQ(1, manager.size());
|
||||||
|
|
||||||
|
manager.remove(resource);
|
||||||
|
srs_usleep(0);
|
||||||
|
ASSERT_EQ(0, manager.size());
|
||||||
|
}
|
||||||
|
|
||||||
// Cover all normal scenarios.
|
// Cover all normal scenarios.
|
||||||
if (true) {
|
if (true) {
|
||||||
SrsResourceManager manager("mgr", true);
|
SrsResourceManager manager("mgr", true);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue