From dc20d5ddbc924a4bc3bcd40a6b5f1ee906ff7799 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 2 Oct 2022 10:05:01 +0800 Subject: [PATCH] ST: Support set context id while thread running. v5.0.72 --- trunk/3rdparty/st-srs/key.c | 7 +++- trunk/3rdparty/st-srs/public.h | 2 ++ trunk/3rdparty/st-srs/sched.c | 1 - trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_st.cpp | 18 ++++++++++- trunk/src/app/srs_app_st.hpp | 9 +++++- trunk/src/core/srs_core_version5.hpp | 2 +- trunk/src/protocol/srs_protocol_log.cpp | 17 ++++++---- trunk/src/protocol/srs_protocol_log.hpp | 5 ++- trunk/src/protocol/srs_protocol_st.cpp | 5 +++ trunk/src/protocol/srs_protocol_st.hpp | 1 + trunk/src/utest/srs_utest_app.cpp | 43 +++++++++++++++++++++++-- 12 files changed, 96 insertions(+), 15 deletions(-) diff --git a/trunk/3rdparty/st-srs/key.c b/trunk/3rdparty/st-srs/key.c index 03f73a73a..23746800e 100644 --- a/trunk/3rdparty/st-srs/key.c +++ b/trunk/3rdparty/st-srs/key.c @@ -79,7 +79,12 @@ int st_key_getlimit(void) int st_thread_setspecific(int key, void *value) { _st_thread_t *me = _ST_CURRENT_THREAD(); - + return st_thread_setspecific2(me, key, value); +} + + +int st_thread_setspecific2(_st_thread_t *me, int key, void *value) +{ if (key < 0 || key >= key_max) { errno = EINVAL; return -1; diff --git a/trunk/3rdparty/st-srs/public.h b/trunk/3rdparty/st-srs/public.h index 75f53a4c2..a2ef1677a 100644 --- a/trunk/3rdparty/st-srs/public.h +++ b/trunk/3rdparty/st-srs/public.h @@ -156,6 +156,8 @@ extern int st_sendmsg(st_netfd_t fd, const struct msghdr *msg, int flags, st_uti extern st_netfd_t st_open(const char *path, int oflags, mode_t mode); +extern int st_thread_setspecific2(st_thread_t thread, int key, void *value); + #ifdef DEBUG extern void _st_show_thread_stack(st_thread_t thread, const char *messg); extern void _st_iterate_threads(void); diff --git a/trunk/3rdparty/st-srs/sched.c b/trunk/3rdparty/st-srs/sched.c index 0ef56ca52..9c197659c 100644 --- a/trunk/3rdparty/st-srs/sched.c +++ b/trunk/3rdparty/st-srs/sched.c @@ -689,7 +689,6 @@ _st_thread_t *st_thread_self(void) return _ST_CURRENT_THREAD(); } - #ifdef DEBUG /* ARGSUSED */ void _st_show_thread_stack(_st_thread_t *thread, const char *messg) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 1b85a3ba8..c23fa438b 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2022-10-02, ST: Support set context id while thread running. v5.0.72 * v5.0, 2022-09-30, RTC: Refine SDP to support GB28181 SSRC spec. v5.0.71 * v5.0, 2022-09-30, GB28181: Refine HTTP parser to support SIP. v5.0.70 * v5.0, 2022-09-30, Kernel: Support lazy sweeping simple GC. v5.0.69 diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index d2576d3b5..dbe328bf0 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -66,7 +66,12 @@ srs_error_t SrsDummyCoroutine::pull() const SrsContextId& SrsDummyCoroutine::cid() { - return _srs_context->get_id(); + return cid_; +} + +void SrsDummyCoroutine::set_cid(const SrsContextId& cid) +{ + cid_ = cid; } SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h) @@ -114,6 +119,11 @@ const SrsContextId& SrsSTCoroutine::cid() return impl_->cid(); } +void SrsSTCoroutine::set_cid(const SrsContextId& cid) +{ + impl_->set_cid(cid); +} + SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h) { // TODO: FIXME: Reduce duplicated code. @@ -257,6 +267,12 @@ const SrsContextId& SrsFastCoroutine::cid() return cid_; } +void SrsFastCoroutine::set_cid(const SrsContextId& cid) +{ + cid_ = cid; + srs_context_set_cid_of(trd, cid); +} + srs_error_t SrsFastCoroutine::cycle() { if (_srs_context) { diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 235614acd..536083a81 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -76,13 +76,17 @@ public: // @return a copy of error, which should be freed by user. // NULL if not terminated and user should pull again. virtual srs_error_t pull() = 0; + // Get and set the context id of coroutine. virtual const SrsContextId& cid() = 0; + virtual void set_cid(const SrsContextId& cid) = 0; }; // An empty coroutine, user can default to this object before create any real coroutine. // @see https://github.com/ossrs/srs/pull/908 class SrsDummyCoroutine : public SrsCoroutine { +private: + SrsContextId cid_; public: SrsDummyCoroutine(); virtual ~SrsDummyCoroutine(); @@ -92,6 +96,7 @@ public: virtual void interrupt(); virtual srs_error_t pull(); virtual const SrsContextId& cid(); + virtual void set_cid(const SrsContextId& cid); }; // A ST-coroutine is a lightweight thread, just like the goroutine. @@ -138,8 +143,9 @@ public: // @remark Return ERROR_THREAD_TERMINATED when thread terminated normally without error. // @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted. virtual srs_error_t pull(); - // Get the context id of thread. + // Get and set the context id of thread. virtual const SrsContextId& cid(); + virtual void set_cid(const SrsContextId& cid); }; // High performance coroutine. @@ -180,6 +186,7 @@ public: return srs_error_copy(trd_err); } const SrsContextId& cid(); + virtual void set_cid(const SrsContextId& cid); private: srs_error_t cycle(); static void* pfn(void* arg); diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index 22d993e86..6c606b908 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 71 +#define VERSION_REVISION 72 #endif diff --git a/trunk/src/protocol/srs_protocol_log.cpp b/trunk/src/protocol/srs_protocol_log.cpp index 38cc03fef..a80da9c70 100644 --- a/trunk/src/protocol/srs_protocol_log.cpp +++ b/trunk/src/protocol/srs_protocol_log.cpp @@ -62,10 +62,19 @@ const SrsContextId& SrsThreadContext::get_id() } const SrsContextId& SrsThreadContext::set_id(const SrsContextId& v) +{ + return srs_context_set_cid_of(srs_thread_self(), v); +} + +void SrsThreadContext::clear_cid() +{ +} + +const SrsContextId& srs_context_set_cid_of(srs_thread_t trd, const SrsContextId& v) { ++_srs_pps_cids_set->sugar; - if (!srs_thread_self()) { + if (!trd) { _srs_context_default = v; return v; } @@ -78,16 +87,12 @@ const SrsContextId& SrsThreadContext::set_id(const SrsContextId& v) srs_assert(r0 == 0); } - int r0 = srs_thread_setspecific(_srs_context_key, cid); + int r0 = srs_thread_setspecific2(trd, _srs_context_key, cid); srs_assert(r0 == 0); return v; } -void SrsThreadContext::clear_cid() -{ -} - impl_SrsContextRestore::impl_SrsContextRestore(SrsContextId cid) { cid_ = cid; diff --git a/trunk/src/protocol/srs_protocol_log.hpp b/trunk/src/protocol/srs_protocol_log.hpp index 43beea085..ffd25e1c1 100644 --- a/trunk/src/protocol/srs_protocol_log.hpp +++ b/trunk/src/protocol/srs_protocol_log.hpp @@ -28,10 +28,13 @@ public: virtual SrsContextId generate_id(); virtual const SrsContextId& get_id(); virtual const SrsContextId& set_id(const SrsContextId& v); -public: +private: virtual void clear_cid(); }; +// Set the context id of specified thread, not self. +extern const SrsContextId& srs_context_set_cid_of(srs_thread_t trd, const SrsContextId& v); + // The context restore stores the context and restore it when done. // Usage: // SrsContextRestore(_srs_context->get_id()); diff --git a/trunk/src/protocol/srs_protocol_st.cpp b/trunk/src/protocol/srs_protocol_st.cpp index 946e6bb27..b64561002 100644 --- a/trunk/src/protocol/srs_protocol_st.cpp +++ b/trunk/src/protocol/srs_protocol_st.cpp @@ -412,6 +412,11 @@ void *srs_thread_getspecific(int key) return st_thread_getspecific(key); } +int srs_thread_setspecific2(srs_thread_t thread, int key, void* value) +{ + return st_thread_setspecific2((st_thread_t)thread, key, value); +} + int srs_netfd_fileno(srs_netfd_t stfd) { return st_netfd_fileno((st_netfd_t)stfd); diff --git a/trunk/src/protocol/srs_protocol_st.hpp b/trunk/src/protocol/srs_protocol_st.hpp index ae409a823..fb0e8f6b3 100644 --- a/trunk/src/protocol/srs_protocol_st.hpp +++ b/trunk/src/protocol/srs_protocol_st.hpp @@ -74,6 +74,7 @@ extern int srs_mutex_unlock(srs_mutex_t mutex); extern int srs_key_create(int* keyp, void (*destructor)(void*)); extern int srs_thread_setspecific(int key, void* value); +extern int srs_thread_setspecific2(srs_thread_t thread, int key, void* value); extern void* srs_thread_getspecific(int key); extern int srs_netfd_fileno(srs_netfd_t stfd); diff --git a/trunk/src/utest/srs_utest_app.cpp b/trunk/src/utest/srs_utest_app.cpp index 447b7633b..99b0dcae9 100644 --- a/trunk/src/utest/srs_utest_app.cpp +++ b/trunk/src/utest/srs_utest_app.cpp @@ -133,7 +133,7 @@ VOID TEST(AppCoroutineTest, Dummy) if (true) { SrsContextId v = dc.cid(); - EXPECT_FALSE(v.empty()); + EXPECT_TRUE(v.empty()); srs_error_t err = dc.pull(); EXPECT_TRUE(err != srs_success); @@ -150,7 +150,7 @@ VOID TEST(AppCoroutineTest, Dummy) dc.stop(); SrsContextId v = dc.cid(); - EXPECT_FALSE(v.empty()); + EXPECT_TRUE(v.empty()); srs_error_t err = dc.pull(); EXPECT_TRUE(err != srs_success); @@ -167,7 +167,7 @@ VOID TEST(AppCoroutineTest, Dummy) dc.interrupt(); SrsContextId v = dc.cid(); - EXPECT_FALSE(v.empty()); + EXPECT_TRUE(v.empty()); srs_error_t err = dc.pull(); EXPECT_TRUE(err != srs_success); @@ -205,6 +205,8 @@ public: srs_error_t r0 = srs_success; srs_cond_signal(running); + + // The cid should be generated if empty. cid = _srs_context->get_id(); while (!quit && (r0 = trd->pull()) == srs_success && err == srs_success) { @@ -213,6 +215,9 @@ public: srs_cond_signal(exited); + // The cid might be updated. + cid = _srs_context->get_id(); + if (err != srs_success) { srs_freep(r0); return err; @@ -222,6 +227,38 @@ public: } }; +VOID TEST(AppCoroutineTest, SetCidOfCoroutine) +{ + srs_error_t err = srs_success; + + MockCoroutineHandler ch; + SrsSTCoroutine sc("test", &ch); + ch.trd = ≻ + EXPECT_TRUE(sc.cid().empty()); + + // Start coroutine, which will create the cid. + HELPER_ASSERT_SUCCESS(sc.start()); + HELPER_ASSERT_SUCCESS(sc.pull()); + + srs_cond_timedwait(ch.running, 100 * SRS_UTIME_MILLISECONDS); + EXPECT_TRUE(!sc.cid().empty()); + EXPECT_TRUE(!ch.cid.empty()); + + // Should be a new cid. + SrsContextId cid = _srs_context->generate_id(); + EXPECT_TRUE(sc.cid().compare(cid) != 0); + EXPECT_TRUE(ch.cid.compare(cid) != 0); + + // Set the cid and stop the coroutine. + sc.set_cid(cid); + sc.stop(); + + // Now the cid should be the new one. + srs_cond_timedwait(ch.exited, 100 * SRS_UTIME_MILLISECONDS); + EXPECT_TRUE(sc.cid().compare(cid) == 0); + EXPECT_TRUE(ch.cid.compare(cid) == 0); +} + VOID TEST(AppCoroutineTest, StartStop) { if (true) {