1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SmartPtr: Support load test for source by srs-bench. v6.0.130 (#4097)

1. Add live benchmark support in srs-bench, which only connects and
disconnects without any media transport, to test source creation and
disposal and verify source memory leaks.
2. SmartPtr: Support cleanup of HTTP-FLV stream. Unregister the HTTP-FLV
handler for the pattern and clean up the objects and resources.
3. Support benchmarking RTMP/SRT with srs-bench by integrating the gosrt
and oryx RTMP libraries.
4. Refine SRT and RTC sources by using a timer to clean up the sources,
following the same strategy as the Live source.

---------

Co-authored-by: Haibo Chen <495810242@qq.com>
Co-authored-by: Jacob Su <suzp1984@gmail.com>
This commit is contained in:
Winlin 2024-06-21 07:13:12 +08:00 committed by GitHub
parent e3d74fb045
commit 1f9309ae25
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
508 changed files with 6805 additions and 3299 deletions

View file

@ -6863,7 +6863,7 @@ srs_utime_t SrsConfig::get_dash_dispose(std::string vhost)
{
SRS_OVERWRITE_BY_ENV_SECONDS("srs.vhost.dash.dash_dispose"); // SRS_VHOST_DASH_DASH_DISPOSE
static srs_utime_t DEFAULT = 0;
static srs_utime_t DEFAULT = 120;
SrsConfDirective* conf = get_dash(vhost);
if (!conf) {

View file

@ -705,6 +705,10 @@ void SrsDash::dispose()
srs_error_t SrsDash::cycle()
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
if (last_update_time_ <= 0) {
last_update_time_ = srs_get_system_time();
@ -734,6 +738,16 @@ srs_error_t SrsDash::cycle()
return err;
}
srs_utime_t SrsDash::cleanup_delay()
{
if (!enabled) {
return 0;
}
// We use larger timeout to cleanup the HLS, after disposed it if required.
return _srs_config->get_dash_dispose(req->vhost) * 1.1;
}
srs_error_t SrsDash::initialize(SrsOriginHub* h, SrsRequest* r)
{
srs_error_t err = srs_success;

View file

@ -162,6 +162,7 @@ public:
public:
virtual void dispose();
virtual srs_error_t cycle();
srs_utime_t cleanup_delay();
public:
// Initalize the encoder.
virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);

View file

@ -1237,6 +1237,10 @@ srs_error_t SrsHls::cycle()
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
if (last_update_time <= 0) {
last_update_time = srs_get_system_time();
}
@ -1272,6 +1276,16 @@ srs_error_t SrsHls::cycle()
return err;
}
srs_utime_t SrsHls::cleanup_delay()
{
if (!enabled) {
return 0;
}
// We use larger timeout to cleanup the HLS, after disposed it if required.
return _srs_config->get_hls_dispose(req->vhost) * 1.1;
}
srs_error_t SrsHls::initialize(SrsOriginHub* h, SrsRequest* r)
{
srs_error_t err = srs_success;

View file

@ -313,6 +313,7 @@ private:
public:
virtual void dispose();
virtual srs_error_t cycle();
srs_utime_t cleanup_delay();
public:
// Initialize the hls by handler and source.
virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);

View file

@ -77,6 +77,22 @@ srs_error_t SrsBufferCache::start()
return err;
}
void SrsBufferCache::stop()
{
trd->stop();
}
bool SrsBufferCache::alive()
{
srs_error_t err = trd->pull();
if (err == srs_success) {
return true;
}
srs_freep(err);
return false;
}
srs_error_t SrsBufferCache::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{
srs_error_t err = srs_success;
@ -561,6 +577,7 @@ SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
cache = c;
req = r->copy()->as_http();
security_ = new SrsSecurity();
alive_ = false;
}
SrsLiveStream::~SrsLiveStream()
@ -610,14 +627,21 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
if ((err = http_hooks_on_play(r)) != srs_success) {
return srs_error_wrap(err, "http hook");
}
alive_ = true;
err = do_serve_http(w, r);
alive_ = false;
http_hooks_on_stop(r);
return err;
}
bool SrsLiveStream::alive()
{
return alive_;
}
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
@ -929,19 +953,19 @@ SrsHttpStreamServer::~SrsHttpStreamServer()
if (true) {
std::map<std::string, SrsLiveEntry*>::iterator it;
for (it = tflvs.begin(); it != tflvs.end(); ++it) {
for (it = templateHandlers.begin(); it != templateHandlers.end(); ++it) {
SrsLiveEntry* entry = it->second;
srs_freep(entry);
}
tflvs.clear();
templateHandlers.clear();
}
if (true) {
std::map<std::string, SrsLiveEntry*>::iterator it;
for (it = sflvs.begin(); it != sflvs.end(); ++it) {
for (it = streamHandlers.begin(); it != streamHandlers.end(); ++it) {
SrsLiveEntry* entry = it->second;
srs_freep(entry);
}
sflvs.clear();
streamHandlers.clear();
}
}
@ -967,12 +991,12 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
SrsLiveEntry* entry = NULL;
// create stream from template when not found.
if (sflvs.find(sid) == sflvs.end()) {
if (tflvs.find(r->vhost) == tflvs.end()) {
if (streamHandlers.find(sid) == streamHandlers.end()) {
if (templateHandlers.find(r->vhost) == templateHandlers.end()) {
return err;
}
SrsLiveEntry* tmpl = tflvs[r->vhost];
SrsLiveEntry* tmpl = templateHandlers[r->vhost];
std::string mount = tmpl->mount;
@ -999,16 +1023,16 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
srs_freep(tmpl->req);
tmpl->req = r->copy()->as_http();
sflvs[sid] = entry;
streamHandlers[sid] = entry;
// mount the http flv stream.
// we must register the handler, then start the thread,
// for the thread will cause thread switch context.
if ((err = mux.handle(mount, entry->stream)) != srs_success) {
return srs_error_wrap(err, "http: mount flv stream for vhost=%s failed", sid.c_str());
}
// start http stream cache thread
if ((err = entry->cache->start()) != srs_success) {
return srs_error_wrap(err, "http: start stream cache failed");
@ -1016,7 +1040,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
srs_trace("http: mount flv stream for sid=%s, mount=%s", sid.c_str(), mount.c_str());
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = sflvs[sid];
entry = streamHandlers[sid];
entry->stream->update_auth(r);
entry->cache->update_auth(r);
}
@ -1032,13 +1056,40 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
void SrsHttpStreamServer::http_unmount(SrsRequest* r)
{
std::string sid = r->get_stream_url();
if (sflvs.find(sid) == sflvs.end()) {
std::map<std::string, SrsLiveEntry*>::iterator it = streamHandlers.find(sid);
if (it == streamHandlers.end()) {
return;
}
SrsLiveEntry* entry = sflvs[sid];
entry->stream->entry->enabled = false;
// Free all HTTP resources.
SrsLiveEntry* entry = it->second;
SrsAutoFree(SrsLiveEntry, entry);
streamHandlers.erase(it);
SrsLiveStream* stream = entry->stream;
SrsAutoFree(SrsLiveStream, stream);
SrsBufferCache* cache = entry->cache;
SrsAutoFree(SrsBufferCache, cache);
// Unmount the HTTP handler.
mux.unhandle(entry->mount, stream);
// Notify cache and stream to stop.
if (stream->entry) stream->entry->enabled = false;
cache->stop();
// Wait for cache and stream to stop.
int i = 0;
for (; i < 1024; i++) {
if (!cache->alive() && !stream->alive()) {
break;
}
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
}
srs_trace("http: unmount flv stream for sid=%s, i=%d", sid.c_str(), i);
}
srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
@ -1067,8 +1118,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
SrsLiveEntry* entry = NULL;
if (true) {
// no http streaming on vhost, ignore.
std::map<std::string, SrsLiveEntry*>::iterator it = tflvs.find(vhost->arg0());
if (it == tflvs.end()) {
std::map<std::string, SrsLiveEntry*>::iterator it = templateHandlers.find(vhost->arg0());
if (it == templateHandlers.end()) {
return err;
}
@ -1124,8 +1175,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
std::string sid = r->get_stream_url();
// check whether the http remux is enabled,
// for example, user disable the http flv then reload.
if (sflvs.find(sid) != sflvs.end()) {
SrsLiveEntry* s_entry = sflvs[sid];
if (streamHandlers.find(sid) != streamHandlers.end()) {
SrsLiveEntry* s_entry = streamHandlers[sid];
if (!s_entry->stream->entry->enabled) {
// only when the http entry is disabled, check the config whether http flv disable,
// for the http flv edge use hijack to trigger the edge ingester, we always mount it
@ -1154,8 +1205,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
// use the handler if exists.
if (ph) {
if (sflvs.find(sid) != sflvs.end()) {
entry = sflvs[sid];
if (streamHandlers.find(sid) != streamHandlers.end()) {
entry = streamHandlers[sid];
*ph = entry->stream;
}
}
@ -1198,8 +1249,8 @@ srs_error_t SrsHttpStreamServer::initialize_flv_entry(std::string vhost)
}
SrsLiveEntry* entry = new SrsLiveEntry(_srs_config->get_vhost_http_remux_mount(vhost));
tflvs[vhost] = entry;
templateHandlers[vhost] = entry;
srs_trace("http flv live stream, vhost=%s, mount=%s", vhost.c_str(), entry->mount.c_str());
return err;

View file

@ -31,6 +31,8 @@ public:
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t start();
virtual void stop();
virtual bool alive();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
// Interface ISrsEndlessThreadHandler.
public:
@ -179,12 +181,14 @@ private:
SrsRequest* req;
SrsBufferCache* cache;
SrsSecurity* security_;
bool alive_;
public:
SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual bool alive();
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r);
@ -230,9 +234,9 @@ private:
public:
SrsHttpServeMux mux;
// The http live streaming template, to create streams.
std::map<std::string, SrsLiveEntry*> tflvs;
// The http live streaming streams, crote by template.
std::map<std::string, SrsLiveEntry*> sflvs;
std::map<std::string, SrsLiveEntry*> templateHandlers;
// The http live streaming streams, created by template.
std::map<std::string, SrsLiveEntry*> streamHandlers;
public:
SrsHttpStreamServer(SrsServer* svr);
virtual ~SrsHttpStreamServer();

View file

@ -71,6 +71,9 @@ using namespace std;
const int kRtpMaxPayloadSize = kRtpPacketSize - 300;
#endif
// the time to cleanup source.
#define SRS_RTC_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS)
// TODO: Add this function into SrsRtpMux class.
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf)
{
@ -244,11 +247,56 @@ void SrsRtcConsumer::on_stream_change(SrsRtcSourceDescription* desc)
SrsRtcSourceManager::SrsRtcSourceManager()
{
lock = srs_mutex_new();
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
}
SrsRtcSourceManager::~SrsRtcSourceManager()
{
srs_mutex_destroy(lock);
srs_freep(timer_);
}
srs_error_t SrsRtcSourceManager::initialize()
{
return setup_ticks();
}
srs_error_t SrsRtcSourceManager::setup_ticks()
{
srs_error_t err = srs_success;
if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsRtcSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSharedPtr<SrsRtcSource>& source = it->second;
// When source expired, remove it.
// @see https://github.com/ossrs/srs/issues/713
if (source->stream_is_dead()) {
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("RTC: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
}
}
return err;
}
srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsRtcSource>& pps)
@ -305,19 +353,6 @@ SrsSharedPtr<SrsRtcSource> SrsRtcSourceManager::fetch(SrsRequest* r)
return source;
}
void SrsRtcSourceManager::eliminate(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
}
}
SrsRtcSourceManager* _srs_rtc_sources = NULL;
ISrsRtcPublishStream::ISrsRtcPublishStream()
@ -351,6 +386,7 @@ SrsRtcSource::SrsRtcSource()
#endif
pli_for_rtmp_ = pli_elapsed_ = 0;
stream_die_at_ = 0;
}
SrsRtcSource::~SrsRtcSource()
@ -365,6 +401,10 @@ SrsRtcSource::~SrsRtcSource()
srs_freep(bridge_);
srs_freep(req);
srs_freep(stream_desc_);
SrsContextId cid = _source_id;
if (cid.empty()) cid = _pre_source_id;
srs_trace("free rtc source id=[%s]", cid.c_str());
}
srs_error_t SrsRtcSource::initialize(SrsRequest* r)
@ -380,6 +420,27 @@ srs_error_t SrsRtcSource::initialize(SrsRequest* r)
return err;
}
bool SrsRtcSource::stream_is_dead()
{
// still publishing?
if (is_created_) {
return false;
}
// has any consumers?
if (!consumers.empty()) {
return false;
}
// Delay cleanup source.
srs_utime_t now = srs_get_system_time();
if (now < stream_die_at_ + SRS_RTC_SOURCE_CLEANUP) {
return false;
}
return true;
}
void SrsRtcSource::init_for_play_before_publishing()
{
// If the stream description has already been setup by RTC publisher,
@ -493,6 +554,8 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
consumer = new SrsRtcConsumer(this);
consumers.push_back(consumer);
stream_die_at_ = 0;
// TODO: FIXME: Implements edge cluster.
return err;
@ -526,7 +589,7 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)
// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
stream_die_at_ = srs_get_system_time();
}
}
@ -629,8 +692,8 @@ void SrsRtcSource::on_unpublish()
stat->on_stream_close(req);
// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
if (consumers.empty()) {
stream_die_at_ = srs_get_system_time();
}
}

View file

@ -111,14 +111,21 @@ public:
void on_stream_change(SrsRtcSourceDescription* desc);
};
class SrsRtcSourceManager
class SrsRtcSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map< std::string, SrsSharedPtr<SrsRtcSource> > pool;
SrsHourGlass* timer_;
public:
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
public:
virtual srs_error_t initialize();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// create source when fetch from cache failed.
// @param r the client request.
@ -127,9 +134,6 @@ public:
public:
// Get the exists source, NULL when not exists.
virtual SrsSharedPtr<SrsRtcSource> fetch(SrsRequest* r);
public:
// Dispose and destroy the source.
virtual void eliminate(SrsRequest* r);
};
// Global singleton instance.
@ -195,11 +199,17 @@ private:
// The PLI for RTC2RTMP.
srs_utime_t pli_for_rtmp_;
srs_utime_t pli_elapsed_;
private:
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;
public:
SrsRtcSource();
virtual ~SrsRtcSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
public:
// Whether stream is dead, which is no publisher or player.
virtual bool stream_is_dead();
private:
void init_for_play_before_publishing();
public:

View file

@ -40,10 +40,14 @@ using namespace std;
#ifdef SRS_RTC
#include <srs_app_rtc_network.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_rtc_source.hpp>
#endif
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
#ifdef SRS_SRT
#include <srs_app_srt_source.hpp>
#endif
SrsSignalManager* SrsSignalManager::instance = NULL;
@ -806,9 +810,21 @@ srs_error_t SrsServer::start(SrsWaitGroup* wg)
srs_error_t err = srs_success;
if ((err = _srs_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
return srs_error_wrap(err, "live sources");
}
#ifdef SRS_SRT
if ((err = _srs_srt_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "srt sources");
}
#endif
#ifdef SRS_RTC
if ((err = _srs_rtc_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc sources");
}
#endif
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

View file

@ -48,7 +48,7 @@ using namespace std;
#define SRS_MIX_CORRECT_PURE_AV 10
// the time to cleanup source.
#define SRS_SOURCE_CLEANUP (30 * SRS_UTIME_SECONDS)
#define SRS_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS)
int srs_time_jitter_string2int(std::string time_jitter)
{
@ -910,6 +910,13 @@ bool SrsOriginHub::active()
return is_active;
}
srs_utime_t SrsOriginHub::cleanup_delay()
{
srs_utime_t hls_delay = hls->cleanup_delay();
srs_utime_t dash_delay = dash->cleanup_delay();
return srs_max(hls_delay, dash_delay);
}
srs_error_t SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet)
{
srs_error_t err = srs_success;
@ -1827,7 +1834,7 @@ srs_error_t SrsLiveSourceManager::setup_ticks()
{
srs_error_t err = srs_success;
if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) {
if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
@ -1848,24 +1855,21 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
// Do cycle source to cleanup components, such as hls dispose.
if ((err = source->cycle()) != srs_success) {
return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str());
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
return srs_error_wrap(err, "source cycle, id=[%s]", cid.c_str());
}
// See SrsSrtSource::on_consumer_destroy
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
#if 1
// When source expired, remove it.
// @see https://github.com/ossrs/srs/issues/713
if (source->stream_is_dead()) {
const SrsContextId& cid = source->source_id();
srs_trace("cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("Live: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
}
#else
++it;
#endif
}
return err;
@ -1923,6 +1927,10 @@ SrsLiveSource::~SrsLiveSource()
srs_freep(req);
srs_freep(bridge_);
SrsContextId cid = _source_id;
if (cid.empty()) cid = _pre_source_id;
srs_trace("free live source id=[%s]", cid.c_str());
}
void SrsLiveSource::dispose()
@ -1944,11 +1952,6 @@ srs_error_t SrsLiveSource::cycle()
bool SrsLiveSource::stream_is_dead()
{
// unknown state?
if (stream_die_at_ == 0) {
return false;
}
// still publishing?
if (!_can_publish || !publish_edge->can_publish()) {
return false;
@ -1958,13 +1961,19 @@ bool SrsLiveSource::stream_is_dead()
if (!consumers.empty()) {
return false;
}
// Delay cleanup source.
srs_utime_t now = srs_get_system_time();
if (now > stream_die_at_ + SRS_SOURCE_CLEANUP) {
return true;
if (now < stream_die_at_ + SRS_SOURCE_CLEANUP) {
return false;
}
// Origin hub delay cleanup.
if (now < stream_die_at_ + hub->cleanup_delay()) {
return false;
}
return false;
return true;
}
bool SrsLiveSource::publisher_is_idle_for(srs_utime_t timeout)
@ -2726,6 +2735,11 @@ void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer)
if (consumers.empty()) {
play_edge->on_all_client_stop();
// If no publishers, the stream is die.
if (_can_publish) {
stream_die_at_ = srs_get_system_time();
}
// For edge server, the stream die when the last player quit, because the edge stream is created by player
// activities, so it should die when all players quit.
if (_srs_config->get_vhost_is_edge(req->vhost)) {

View file

@ -355,6 +355,8 @@ public:
virtual srs_error_t cycle();
// Whether the stream hub is active, or stream is publishing.
virtual bool active();
// The delay cleanup time.
srs_utime_t cleanup_delay();
public:
// When got a parsed metadata.
virtual srs_error_t on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet);

View file

@ -20,6 +20,9 @@ using namespace std;
#include <srs_app_statistic.hpp>
#include <srs_app_pithy_print.hpp>
// the time to cleanup source.
#define SRS_SRT_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS)
SrsSrtPacket::SrsSrtPacket()
{
shared_buffer_ = NULL;
@ -95,11 +98,56 @@ int SrsSrtPacket::size()
SrsSrtSourceManager::SrsSrtSourceManager()
{
lock = srs_mutex_new();
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
}
SrsSrtSourceManager::~SrsSrtSourceManager()
{
srs_mutex_destroy(lock);
srs_freep(timer_);
}
srs_error_t SrsSrtSourceManager::initialize()
{
return setup_ticks();
}
srs_error_t SrsSrtSourceManager::setup_ticks()
{
srs_error_t err = srs_success;
if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsSrtSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSharedPtr<SrsSrtSource>& source = it->second;
// When source expired, remove it.
// @see https://github.com/ossrs/srs/issues/713
if (source->stream_is_dead()) {
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("SRT: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
}
}
return err;
}
srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps)
@ -137,19 +185,6 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<Srs
return err;
}
void SrsSrtSourceManager::eliminate(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
}
}
SrsSrtSourceManager* _srs_srt_sources = NULL;
SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource* s)
@ -873,6 +908,7 @@ SrsSrtSource::SrsSrtSource()
can_publish_ = true;
frame_builder_ = NULL;
bridge_ = NULL;
stream_die_at_ = 0;
}
SrsSrtSource::~SrsSrtSource()
@ -884,6 +920,10 @@ SrsSrtSource::~SrsSrtSource()
srs_freep(frame_builder_);
srs_freep(bridge_);
srs_freep(req);
SrsContextId cid = _source_id;
if (cid.empty()) cid = _pre_source_id;
srs_trace("free srt source id=[%s]", cid.c_str());
}
srs_error_t SrsSrtSource::initialize(SrsRequest* r)
@ -895,6 +935,27 @@ srs_error_t SrsSrtSource::initialize(SrsRequest* r)
return err;
}
bool SrsSrtSource::stream_is_dead()
{
// still publishing?
if (!can_publish_) {
return false;
}
// has any consumers?
if (!consumers.empty()) {
return false;
}
// Delay cleanup source.
srs_utime_t now = srs_get_system_time();
if (now < stream_die_at_ + SRS_SRT_SOURCE_CLEANUP) {
return false;
}
return true;
}
srs_error_t SrsSrtSource::on_source_id_changed(SrsContextId id)
{
srs_error_t err = srs_success;
@ -949,6 +1010,8 @@ srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer)
consumer = new SrsSrtConsumer(this);
consumers.push_back(consumer);
stream_die_at_ = 0;
return err;
}
@ -972,7 +1035,7 @@ void SrsSrtSource::on_consumer_destroy(SrsSrtConsumer* consumer)
// Destroy and cleanup source when no publishers and consumers.
if (can_publish_ && consumers.empty()) {
_srs_srt_sources->eliminate(req);
stream_die_at_ = srs_get_system_time();
}
}
@ -1029,8 +1092,8 @@ void SrsSrtSource::on_unpublish()
}
// Destroy and cleanup source when no publishers and consumers.
if (can_publish_ && consumers.empty()) {
_srs_srt_sources->eliminate(req);
if (consumers.empty()) {
stream_die_at_ = srs_get_system_time();
}
}

View file

@ -16,6 +16,7 @@
#include <srs_protocol_st.hpp>
#include <srs_app_stream_bridge.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_hourglass.hpp>
class SrsSharedPtrMessage;
class SrsRequest;
@ -47,21 +48,26 @@ private:
int actual_buffer_size_;
};
class SrsSrtSourceManager
class SrsSrtSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map< std::string, SrsSharedPtr<SrsSrtSource> > pool;
SrsHourGlass* timer_;
public:
SrsSrtSourceManager();
virtual ~SrsSrtSourceManager();
public:
virtual srs_error_t initialize();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps);
// Dispose and destroy the source.
virtual void eliminate(SrsRequest* r);
};
// Global singleton instance.
@ -156,6 +162,9 @@ public:
virtual ~SrsSrtSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
public:
// Whether stream is dead, which is no publisher or player.
virtual bool stream_is_dead();
public:
// The source id changed.
virtual srs_error_t on_source_id_changed(SrsContextId id);
@ -190,6 +199,8 @@ private:
// To delivery packets to clients.
std::vector<SrsSrtConsumer*> consumers;
bool can_publish_;
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;
private:
SrsSrtFrameBuilder* frame_builder_;
ISrsStreamBridge* bridge_;

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 6
#define VERSION_MINOR 0
#define VERSION_REVISION 129
#define VERSION_REVISION 130
#endif

View file

@ -691,14 +691,12 @@ srs_error_t SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler* handle
srs_assert(handler);
if (pattern.empty()) {
srs_freep(handler);
return srs_error_new(ERROR_HTTP_PATTERN_EMPTY, "empty pattern");
}
if (entries.find(pattern) != entries.end()) {
SrsHttpMuxEntry* exists = entries[pattern];
if (exists->explicit_match) {
srs_freep(handler);
return srs_error_new(ERROR_HTTP_PATTERN_DUPLICATED, "pattern=%s exists", pattern.c_str());
}
}
@ -754,6 +752,32 @@ srs_error_t SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler* handle
return srs_success;
}
void SrsHttpServeMux::unhandle(std::string pattern, ISrsHttpHandler* handler)
{
if (true) {
std::map<std::string, SrsHttpMuxEntry*>::iterator it = entries.find(pattern);
if (it != entries.end()) {
SrsHttpMuxEntry* entry = it->second;
entries.erase(it);
// We don't free the handler, because user should free it.
if (entry->handler != handler) {
srs_freep(entry);
}
}
}
std::string vhost = pattern;
if (pattern.at(0) != '/') {
if (pattern.find("/") != string::npos) {
vhost = pattern.substr(0, pattern.find("/"));
}
std::map<std::string, ISrsHttpHandler*>::iterator it = vhosts.find(vhost);
if (it != vhosts.end()) vhosts.erase(it);
}
}
srs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;

View file

@ -470,6 +470,8 @@ public:
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
virtual srs_error_t handle(std::string pattern, ISrsHttpHandler* handler);
// Remove the handler for pattern. Note that this will not free the handler.
void unhandle(std::string pattern, ISrsHttpHandler* handler);
// Interface ISrsHttpServeMux
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);

View file

@ -748,6 +748,7 @@ VOID TEST(ProtocolHTTPTest, HTTPServerMuxerImplicitHandler)
HELPER_ASSERT_SUCCESS(s.handle("/api/", h0));
MockHttpHandler* h1 = new MockHttpHandler("Done");
SrsAutoFree(MockHttpHandler, h1);
HELPER_EXPECT_FAILED(s.handle("/api/", h1));
}
@ -892,6 +893,7 @@ VOID TEST(ProtocolHTTPTest, HTTPServerMuxerBasic)
HELPER_ASSERT_SUCCESS(s.initialize());
MockHttpHandler* h0 = new MockHttpHandler("Hello, world!");
SrsAutoFree(MockHttpHandler, h0);
HELPER_EXPECT_FAILED(s.handle("", h0));
}