mirror of
https://github.com/ossrs/srs.git
synced 2025-02-12 11:21:52 +00:00
For #1568, extract SrsSourceManager from SrsSource.
This commit is contained in:
parent
fea293d0b1
commit
9dbd049e79
5 changed files with 49 additions and 23 deletions
|
@ -1089,7 +1089,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsSource* s = NULL;
|
SrsSource* s = NULL;
|
||||||
if ((err = SrsSource::fetch_or_create(r, server, &s)) != srs_success) {
|
if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) {
|
||||||
return srs_error_wrap(err, "source create");
|
return srs_error_wrap(err, "source create");
|
||||||
}
|
}
|
||||||
srs_assert(s != NULL);
|
srs_assert(s != NULL);
|
||||||
|
|
|
@ -496,7 +496,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
||||||
|
|
||||||
// find a source to serve.
|
// find a source to serve.
|
||||||
SrsSource* source = NULL;
|
SrsSource* source = NULL;
|
||||||
if ((err = SrsSource::fetch_or_create(req, server, &source)) != srs_success) {
|
if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
|
||||||
return srs_error_wrap(err, "rtmp: fetch source");
|
return srs_error_wrap(err, "rtmp: fetch source");
|
||||||
}
|
}
|
||||||
srs_assert(source != NULL);
|
srs_assert(source != NULL);
|
||||||
|
|
|
@ -523,7 +523,7 @@ void SrsServer::dispose()
|
||||||
// @remark don't dispose ingesters, for too slow.
|
// @remark don't dispose ingesters, for too slow.
|
||||||
|
|
||||||
// dispose the source for hls and dvr.
|
// dispose the source for hls and dvr.
|
||||||
SrsSource::dispose_all();
|
_srs_sources->dispose();
|
||||||
|
|
||||||
// @remark don't dispose all connections, for too slow.
|
// @remark don't dispose all connections, for too slow.
|
||||||
|
|
||||||
|
@ -952,7 +952,7 @@ srs_error_t SrsServer::do_cycle()
|
||||||
}
|
}
|
||||||
|
|
||||||
// notice the stream sources to cycle.
|
// notice the stream sources to cycle.
|
||||||
if ((err = SrsSource::cycle_all()) != srs_success) {
|
if ((err = _srs_sources->cycle()) != srs_success) {
|
||||||
return srs_error_wrap(err, "source cycle");
|
return srs_error_wrap(err, "source cycle");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1635,9 +1635,17 @@ srs_error_t SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg)
|
||||||
return vformat->on_video(msg);
|
return vformat->on_video(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<std::string, SrsSource*> SrsSource::pool;
|
SrsSourceManager* _srs_sources = new SrsSourceManager();
|
||||||
|
|
||||||
srs_error_t SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
|
SrsSourceManager::SrsSourceManager()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsSourceManager::~SrsSourceManager()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
@ -1665,7 +1673,7 @@ srs_error_t SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsS
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsSource* SrsSource::fetch(SrsRequest* r)
|
SrsSource* SrsSourceManager::fetch(SrsRequest* r)
|
||||||
{
|
{
|
||||||
SrsSource* source = NULL;
|
SrsSource* source = NULL;
|
||||||
|
|
||||||
|
@ -1679,12 +1687,12 @@ SrsSource* SrsSource::fetch(SrsRequest* r)
|
||||||
// we always update the request of resource,
|
// we always update the request of resource,
|
||||||
// for origin auth is on, the token in request maybe invalid,
|
// for origin auth is on, the token in request maybe invalid,
|
||||||
// and we only need to update the token of request, it's simple.
|
// and we only need to update the token of request, it's simple.
|
||||||
source->req->update_auth(r);
|
source->update_auth(r);
|
||||||
|
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsSource::dispose_all()
|
void SrsSourceManager::dispose()
|
||||||
{
|
{
|
||||||
std::map<std::string, SrsSource*>::iterator it;
|
std::map<std::string, SrsSource*>::iterator it;
|
||||||
for (it = pool.begin(); it != pool.end(); ++it) {
|
for (it = pool.begin(); it != pool.end(); ++it) {
|
||||||
|
@ -1694,16 +1702,16 @@ void SrsSource::dispose_all()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsSource::cycle_all()
|
srs_error_t SrsSourceManager::cycle()
|
||||||
{
|
{
|
||||||
int cid = _srs_context->get_id();
|
int cid = _srs_context->get_id();
|
||||||
srs_error_t err = do_cycle_all();
|
srs_error_t err = do_cycle();
|
||||||
_srs_context->set_id(cid);
|
_srs_context->set_id(cid);
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsSource::do_cycle_all()
|
srs_error_t SrsSourceManager::do_cycle()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
@ -1744,7 +1752,7 @@ srs_error_t SrsSource::do_cycle_all()
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsSource::destroy()
|
void SrsSourceManager::destroy()
|
||||||
{
|
{
|
||||||
std::map<std::string, SrsSource*>::iterator it;
|
std::map<std::string, SrsSource*>::iterator it;
|
||||||
for (it = pool.begin(); it != pool.end(); ++it) {
|
for (it = pool.begin(); it != pool.end(); ++it) {
|
||||||
|
@ -1994,6 +2002,11 @@ bool SrsSource::inactive()
|
||||||
return _can_publish;
|
return _can_publish;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SrsSource::update_auth(SrsRequest* r)
|
||||||
|
{
|
||||||
|
req->update_auth(r);
|
||||||
|
}
|
||||||
|
|
||||||
bool SrsSource::can_publish(bool is_edge)
|
bool SrsSource::can_publish(bool is_edge)
|
||||||
{
|
{
|
||||||
if (is_edge) {
|
if (is_edge) {
|
||||||
|
|
|
@ -438,32 +438,43 @@ public:
|
||||||
virtual srs_error_t update_vsh(SrsSharedPtrMessage* msg);
|
virtual srs_error_t update_vsh(SrsSharedPtrMessage* msg);
|
||||||
};
|
};
|
||||||
|
|
||||||
// live streaming source.
|
// The source manager to create and refresh all stream sources.
|
||||||
class SrsSource : public ISrsReloadHandler
|
class SrsSourceManager
|
||||||
{
|
{
|
||||||
friend class SrsOriginHub;
|
|
||||||
private:
|
private:
|
||||||
static std::map<std::string, SrsSource*> pool;
|
std::map<std::string, SrsSource*> pool;
|
||||||
|
public:
|
||||||
|
SrsSourceManager();
|
||||||
|
virtual ~SrsSourceManager();
|
||||||
public:
|
public:
|
||||||
// create source when fetch from cache failed.
|
// create source when fetch from cache failed.
|
||||||
// @param r the client request.
|
// @param r the client request.
|
||||||
// @param h the event handler for source.
|
// @param h the event handler for source.
|
||||||
// @param pps the matched source, if success never be NULL.
|
// @param pps the matched source, if success never be NULL.
|
||||||
static srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
|
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
|
||||||
private:
|
private:
|
||||||
// Get the exists source, NULL when not exists.
|
// Get the exists source, NULL when not exists.
|
||||||
// update the request and return the exists source.
|
// update the request and return the exists source.
|
||||||
static SrsSource* fetch(SrsRequest* r);
|
virtual SrsSource* fetch(SrsRequest* r);
|
||||||
public:
|
public:
|
||||||
// dispose and cycle all sources.
|
// dispose and cycle all sources.
|
||||||
static void dispose_all();
|
virtual void dispose();
|
||||||
static srs_error_t cycle_all();
|
virtual srs_error_t cycle();
|
||||||
private:
|
private:
|
||||||
static srs_error_t do_cycle_all();
|
virtual srs_error_t do_cycle();
|
||||||
public:
|
public:
|
||||||
// when system exit, destroy the sources,
|
// when system exit, destroy the sources,
|
||||||
// For gmc to analysis mem leaks.
|
// For gmc to analysis mem leaks.
|
||||||
static void destroy();
|
virtual void destroy();
|
||||||
|
};
|
||||||
|
|
||||||
|
// Global singleton instance.
|
||||||
|
extern SrsSourceManager* _srs_sources;
|
||||||
|
|
||||||
|
// live streaming source.
|
||||||
|
class SrsSource : public ISrsReloadHandler
|
||||||
|
{
|
||||||
|
friend class SrsOriginHub;
|
||||||
private:
|
private:
|
||||||
// For publish, it's the publish client id.
|
// For publish, it's the publish client id.
|
||||||
// For edge, it's the edge ingest id.
|
// For edge, it's the edge ingest id.
|
||||||
|
@ -531,6 +542,8 @@ public:
|
||||||
// Whether source is inactive, which means there is no publishing stream source.
|
// Whether source is inactive, which means there is no publishing stream source.
|
||||||
// @remark For edge, it's inactive util stream has been pulled from origin.
|
// @remark For edge, it's inactive util stream has been pulled from origin.
|
||||||
virtual bool inactive();
|
virtual bool inactive();
|
||||||
|
// Update the authentication information in request.
|
||||||
|
virtual void update_auth(SrsRequest* r);
|
||||||
public:
|
public:
|
||||||
virtual bool can_publish(bool is_edge);
|
virtual bool can_publish(bool is_edge);
|
||||||
virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
|
virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
|
||||||
|
|
Loading…
Reference in a new issue