diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md
index d14b6eea2..4d75d4cf4 100644
--- a/trunk/doc/CHANGELOG.md
+++ b/trunk/doc/CHANGELOG.md
@@ -7,6 +7,7 @@ The changelog for SRS.
## SRS 6.0 Changelog
+* v6.0, 2024-09-01, Merge [#4165](https://github.com/ossrs/srs/pull/4165): FLV: Refine source and http handler. v6.0.155 (#4165)
* v6.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166)
* v6.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v6.0.153 (#4162)
* v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164)
diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp
index df2c4a523..36131c451 100755
--- a/trunk/src/app/srs_app_http_stream.cpp
+++ b/trunk/src/app/srs_app_http_stream.cpp
@@ -39,8 +39,9 @@ using namespace std;
#include
#include
#include
+#include
-SrsBufferCache::SrsBufferCache(SrsRequest* r)
+SrsBufferCache::SrsBufferCache(SrsServer* s, SrsRequest* r)
{
req = r->copy()->as_http();
queue = new SrsMessageQueue(true);
@@ -48,6 +49,7 @@ SrsBufferCache::SrsBufferCache(SrsRequest* r)
// TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
+ server_ = s;
}
SrsBufferCache::~SrsBufferCache()
@@ -69,6 +71,11 @@ srs_error_t SrsBufferCache::update_auth(SrsRequest* r)
srs_error_t SrsBufferCache::start()
{
srs_error_t err = srs_success;
+
+ // Not enabled.
+ if (fast_cache <= 0) {
+ return err;
+ }
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "corotine");
@@ -79,11 +86,21 @@ srs_error_t SrsBufferCache::start()
void SrsBufferCache::stop()
{
+ // Not enabled.
+ if (fast_cache <= 0) {
+ return;
+ }
+
trd->stop();
}
bool SrsBufferCache::alive()
{
+ // Not enabled.
+ if (fast_cache <= 0) {
+ return false;
+ }
+
srs_error_t err = trd->pull();
if (err == srs_success) {
return true;
@@ -115,17 +132,12 @@ srs_error_t SrsBufferCache::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterA
srs_error_t SrsBufferCache::cycle()
{
srs_error_t err = srs_success;
-
- // TODO: FIXME: support reload.
- if (fast_cache <= 0) {
- srs_usleep(SRS_STREAM_CACHE_CYCLE);
- return err;
- }
- SrsSharedPtr live_source = _srs_sources->fetch(req);
- if (!live_source.get()) {
- return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
+ SrsSharedPtr live_source;
+ if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) {
+ return srs_error_wrap(err, "source create");
}
+ srs_assert(live_source.get() != NULL);
// the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge.
@@ -578,11 +590,12 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
return writer->writev(iov, iovcnt, pnwrite);
}
-SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
+SrsLiveStream::SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c)
{
cache = c;
req = r->copy()->as_http();
security_ = new SrsSecurity();
+ server_ = s;
}
SrsLiveStream::~SrsLiveStream()
@@ -636,10 +649,17 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return srs_error_wrap(err, "http hook");
}
- SrsSharedPtr live_source = _srs_sources->fetch(req);
- if (!live_source.get()) {
- return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
+ // Always try to create the source, because http handler won't create it.
+ SrsSharedPtr live_source;
+ if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) {
+ return srs_error_wrap(err, "source create");
}
+ srs_assert(live_source.get() != NULL);
+
+ bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
+ int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost);
+ live_source->set_cache(enabled_cache);
+ live_source->set_gop_cache_max_frames(gcmf);
// Create consumer of source, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
@@ -926,6 +946,7 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder* enc, SrsSh
SrsLiveEntry::SrsLiveEntry(std::string m)
{
mount = m;
+ disposing = false;
stream = NULL;
cache = NULL;
@@ -967,6 +988,7 @@ bool SrsLiveEntry::is_mp3()
SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr)
{
server = svr;
+ async_ = new SrsAsyncCallWorker();
mux.hijack(this);
_srs_config->subscribe(this);
@@ -976,6 +998,9 @@ SrsHttpStreamServer::~SrsHttpStreamServer()
{
mux.unhijack(this);
_srs_config->unsubscribe(this);
+
+ async_->stop();
+ srs_freep(async_);
if (true) {
std::map::iterator it;
@@ -1003,6 +1028,10 @@ srs_error_t SrsHttpStreamServer::initialize()
if ((err = initialize_flv_streaming()) != srs_success) {
return srs_error_wrap(err, "http flv stream");
}
+
+ if ((err = async_->start()) != srs_success) {
+ return srs_error_wrap(err, "async start");
+ }
return err;
}
@@ -1037,8 +1066,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
entry = new SrsLiveEntry(mount);
entry->req = r->copy()->as_http();
- entry->cache = new SrsBufferCache(r);
- entry->stream = new SrsLiveStream(r, entry->cache);
+ entry->cache = new SrsBufferCache(server, r);
+ entry->stream = new SrsLiveStream(server, r, entry->cache);
// TODO: FIXME: maybe refine the logic of http remux service.
// if user push streams followed:
@@ -1067,6 +1096,12 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = streamHandlers[sid];
+
+ // Fail if system is disposing the entry.
+ if (entry->disposing) {
+ return srs_error_new(ERROR_STREAM_DISPOSING, "stream is disposing");
+ }
+
entry->stream->update_auth(r);
entry->cache->update_auth(r);
}
@@ -1088,36 +1123,19 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
return;
}
- // Free all HTTP resources.
- SrsUniquePtr entry(it->second);
- streamHandlers.erase(it);
-
- SrsUniquePtr stream(entry->stream);
- SrsUniquePtr cache(entry->cache);
-
- // Notify cache and stream to stop.
- if (stream->entry) stream->entry->enabled = false;
- stream->expire();
- cache->stop();
-
- // Wait for cache and stream to stop.
- int i = 0;
- for (; i < 1024; i++) {
- if (!cache->alive() && !stream->alive()) {
- break;
- }
- srs_usleep(100 * SRS_UTIME_MILLISECONDS);
+ // Set the entry to disposing, which will prevent the stream to be reused.
+ SrsLiveEntry* entry = it->second;
+ if (entry->disposing) {
+ return;
}
+ entry->disposing = true;
- if (cache->alive() || stream->alive()) {
- srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
+ // Use async worker to execute the task, which will destroy the stream.
+ srs_error_t err = srs_success;
+ if ((err = async_->execute(new SrsHttpStreamDestroy(&mux, &streamHandlers, sid))) != srs_success) {
+ srs_warn("http: ignore unmount stream failed, sid=%s, err=%s", sid.c_str(), srs_error_desc(err).c_str());
+ srs_freep(err);
}
-
- // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
- // stream stopped for it uses it.
- mux.unhandle(entry->mount, stream.get());
-
- srs_trace("http: unmount flv stream for sid=%s, i=%d", sid.c_str(), i);
}
srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
@@ -1214,17 +1232,6 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}
}
- SrsSharedPtr live_source;
- if ((err = _srs_sources->fetch_or_create(r.get(), server, live_source)) != srs_success) {
- return srs_error_wrap(err, "source create");
- }
- srs_assert(live_source.get() != NULL);
-
- bool enabled_cache = _srs_config->get_gop_cache(r->vhost);
- int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost);
- live_source->set_cache(enabled_cache);
- live_source->set_gop_cache_max_frames(gcmf);
-
// create http streaming handler.
if ((err = http_mount(r.get())) != srs_success) {
return srs_error_wrap(err, "http mount");
@@ -1235,11 +1242,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
entry = streamHandlers[sid];
*ph = entry->stream;
}
-
- // trigger edge to fetch from origin.
- bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost);
- srs_trace("flv: source url=%s, is_edge=%d, source_id=%s/%s",
- r->get_stream_url().c_str(), vhost_is_edge, live_source->source_id().c_str(), live_source->pre_source_id().c_str());
+
+ srs_trace("flv: hijack %s ok", upath.c_str());
return err;
}
@@ -1281,3 +1285,64 @@ srs_error_t SrsHttpStreamServer::initialize_flv_entry(std::string vhost)
return err;
}
+SrsHttpStreamDestroy::SrsHttpStreamDestroy(SrsHttpServeMux* mux, map* handlers, string sid)
+{
+ mux_ = mux;
+ sid_ = sid;
+ streamHandlers_ = handlers;
+}
+
+SrsHttpStreamDestroy::~SrsHttpStreamDestroy()
+{
+}
+
+srs_error_t SrsHttpStreamDestroy::call()
+{
+ srs_error_t err = srs_success;
+
+ std::map::iterator it = streamHandlers_->find(sid_);
+ if (it == streamHandlers_->end()) {
+ return err;
+ }
+
+ // Free all HTTP resources.
+ SrsUniquePtr entry(it->second);
+ srs_assert(entry->disposing);
+
+ SrsUniquePtr stream(entry->stream);
+ SrsUniquePtr cache(entry->cache);
+
+ // Notify cache and stream to stop.
+ if (stream->entry) stream->entry->enabled = false;
+ stream->expire();
+ cache->stop();
+
+ // Wait for cache and stream to stop.
+ int i = 0;
+ for (; i < 1024; i++) {
+ if (!cache->alive() && !stream->alive()) {
+ break;
+ }
+ srs_usleep(100 * SRS_UTIME_MILLISECONDS);
+ }
+
+ if (cache->alive() || stream->alive()) {
+ srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
+ }
+
+ // Remove the entry from handlers.
+ streamHandlers_->erase(it);
+
+ // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
+ // stream stopped for it uses it.
+ mux_->unhandle(entry->mount, stream.get());
+
+ srs_trace("http: unmount flv stream for sid=%s, i=%d", sid_.c_str(), i);
+ return err;
+}
+
+string SrsHttpStreamDestroy::to_string()
+{
+ return "destroy";
+}
+
diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp
index cf3737982..352c4f99f 100755
--- a/trunk/src/app/srs_app_http_stream.hpp
+++ b/trunk/src/app/srs_app_http_stream.hpp
@@ -10,6 +10,7 @@
#include
#include
#include
+#include
#include
@@ -17,18 +18,20 @@ class SrsAacTransmuxer;
class SrsMp3Transmuxer;
class SrsFlvTransmuxer;
class SrsTsTransmuxer;
+class SrsAsyncCallWorker;
// A cache for HTTP Live Streaming encoder, to make android(weixin) happy.
class SrsBufferCache : public ISrsCoroutineHandler
{
private:
srs_utime_t fast_cache;
+ SrsServer* server_;
private:
SrsMessageQueue* queue;
SrsRequest* req;
SrsCoroutine* trd;
public:
- SrsBufferCache(SrsRequest* r);
+ SrsBufferCache(SrsServer* s, SrsRequest* r);
virtual ~SrsBufferCache();
virtual srs_error_t update_auth(SrsRequest* r);
public:
@@ -184,12 +187,13 @@ private:
SrsRequest* req;
SrsBufferCache* cache;
SrsSecurity* security_;
+ SrsServer* server_;
// For multiple viewers, which means there will more than one alive viewers for a live stream, so we must
// use an int value to represent if there is any viewer is alive. We should never do cleanup unless all
// viewers closed the connection.
std::vector viewers_;
public:
- SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
+ SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsRequest* r);
public:
@@ -223,6 +227,9 @@ public:
SrsLiveStream* stream;
SrsBufferCache* cache;
+
+ // Whether is disposing the entry.
+ bool disposing;
SrsLiveEntry(std::string m);
virtual ~SrsLiveEntry();
@@ -240,6 +247,7 @@ class SrsHttpStreamServer : public ISrsReloadHandler
{
private:
SrsServer* server;
+ SrsAsyncCallWorker* async_;
public:
SrsHttpServeMux mux;
// The http live streaming template, to create streams.
@@ -263,5 +271,19 @@ private:
virtual srs_error_t initialize_flv_entry(std::string vhost);
};
+class SrsHttpStreamDestroy : public ISrsAsyncCallTask
+{
+private:
+ std::string sid_;
+ std::map* streamHandlers_;
+ SrsHttpServeMux* mux_;
+public:
+ SrsHttpStreamDestroy(SrsHttpServeMux* mux, std::map* handlers, std::string sid);
+ virtual ~SrsHttpStreamDestroy();
+public:
+ virtual srs_error_t call();
+ virtual std::string to_string();
+};
+
#endif
diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp
index ed3ffb50d..32328d99a 100644
--- a/trunk/src/core/srs_core_version6.hpp
+++ b/trunk/src/core/srs_core_version6.hpp
@@ -9,6 +9,6 @@
#define VERSION_MAJOR 6
#define VERSION_MINOR 0
-#define VERSION_REVISION 154
+#define VERSION_REVISION 155
#endif
diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp
index af9acf12d..dcd818483 100644
--- a/trunk/src/kernel/srs_kernel_error.hpp
+++ b/trunk/src/kernel/srs_kernel_error.hpp
@@ -107,7 +107,8 @@
XX(ERROR_BACKTRACE_ADDR2LINE , 1094, "BacktraceAddr2Line", "Backtrace addr2line failed") \
XX(ERROR_SYSTEM_FILE_NOT_OPEN , 1095, "FileNotOpen", "File is not opened") \
XX(ERROR_SYSTEM_FILE_SETVBUF , 1096, "FileSetVBuf", "Failed to set file vbuf") \
- XX(ERROR_NO_SOURCE , 1097, "NoSource", "No source found")
+ XX(ERROR_NO_SOURCE , 1097, "NoSource", "No source found") \
+ XX(ERROR_STREAM_DISPOSING , 1098, "StreamDisposing", "Stream is disposing")
/**************************************************/
/* RTMP protocol error. */