1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SuqashSRS4: Build SRT native

This commit is contained in:
winlin 2021-05-16 16:14:00 +08:00
parent a1d7fe46c1
commit e3bca883e1
150 changed files with 45007 additions and 398 deletions

View file

@ -3678,7 +3678,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa"
&& n != "encrypt" && n != "reuseport" && n != "merge_nalus" && n != "perf_stat" && n != "black_hole"
&& n != "encrypt" && n != "reuseport" && n != "merge_nalus" && n != "black_hole"
&& n != "ip_family") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str());
}
@ -5033,23 +5033,6 @@ bool SrsConfig::get_rtc_server_merge_nalus()
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
bool SrsConfig::get_rtc_server_perf_stat()
{
static bool DEFAULT = false;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("perf_stat");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_rtc_server_black_hole()
{
static bool DEFAULT = false;

View file

@ -542,7 +542,6 @@ public:
virtual bool get_rtc_server_encrypt();
virtual int get_rtc_server_reuseport();
virtual bool get_rtc_server_merge_nalus();
virtual bool get_rtc_server_perf_stat();
public:
virtual bool get_rtc_server_black_hole();
virtual std::string get_rtc_server_black_hole_addr();

View file

@ -139,7 +139,7 @@ SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stre
return it->second;
}
srs_error_t SrsCoWorkers::on_publish(SrsSource* s, SrsRequest* r)
srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
@ -157,7 +157,7 @@ srs_error_t SrsCoWorkers::on_publish(SrsSource* s, SrsRequest* r)
return err;
}
void SrsCoWorkers::on_unpublish(SrsSource* s, SrsRequest* r)
void SrsCoWorkers::on_unpublish(SrsLiveSource* s, SrsRequest* r)
{
string url = r->get_stream_url();

View file

@ -31,7 +31,7 @@
class SrsJsonAny;
class SrsRequest;
class SrsSource;
class SrsLiveSource;
// For origin cluster.
class SrsCoWorkers
@ -50,8 +50,8 @@ public:
private:
virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream);
public:
virtual srs_error_t on_publish(SrsSource* s, SrsRequest* r);
virtual void on_unpublish(SrsSource* s, SrsRequest* r);
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
};
#endif

View file

@ -29,7 +29,7 @@
#include <string>
#include <sstream>
class SrsSource;
class SrsLiveSource;
class SrsOriginHub;
class SrsRequest;
class SrsBuffer;

View file

@ -421,7 +421,7 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(trd);
}
srs_error_t SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r)
srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r)
{
source = s;
edge = e;
@ -714,7 +714,7 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
return queue->set_queue_size(queue_size);
}
srs_error_t SrsEdgeForwarder::initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r)
srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r)
{
source = s;
edge = e;
@ -917,7 +917,7 @@ SrsPlayEdge::~SrsPlayEdge()
srs_freep(ingester);
}
srs_error_t SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
srs_error_t SrsPlayEdge::initialize(SrsLiveSource* source, SrsRequest* req)
{
srs_error_t err = srs_success;
@ -997,7 +997,7 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
return forwarder->set_queue_size(queue_size);
}
srs_error_t SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
srs_error_t SrsPublishEdge::initialize(SrsLiveSource* source, SrsRequest* req)
{
srs_error_t err = srs_success;

View file

@ -32,7 +32,7 @@
class SrsStSocket;
class SrsRtmpServer;
class SrsSource;
class SrsLiveSource;
class SrsRequest;
class SrsPlayEdge;
class SrsPublishEdge;
@ -153,7 +153,7 @@ public:
class SrsEdgeIngester : public ISrsCoroutineHandler
{
private:
SrsSource* source;
SrsLiveSource* source;
SrsPlayEdge* edge;
SrsRequest* req;
SrsCoroutine* trd;
@ -163,7 +163,7 @@ public:
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
public:
virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
virtual std::string get_curr_origin();
@ -181,7 +181,7 @@ private:
class SrsEdgeForwarder : public ISrsCoroutineHandler
{
private:
SrsSource* source;
SrsLiveSource* source;
SrsPublishEdge* edge;
SrsRequest* req;
SrsCoroutine* trd;
@ -200,7 +200,7 @@ public:
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t start();
virtual void stop();
// Interface ISrsReusableThread2Handler
@ -225,7 +225,7 @@ public:
// Always use the req of source,
// For we assume all client to edge is invalid,
// if auth open, edge must valid it from origin, then service it.
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
// When client play stream on edge.
virtual srs_error_t on_client_play();
// When all client stopped play, disconnect to origin.
@ -248,7 +248,7 @@ public:
public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
virtual bool can_publish();
// When client publish stream on edge.
virtual srs_error_t on_client_publish();

View file

@ -76,7 +76,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
srs_error_t err = srs_success;
// it's ok to use the request object,
// SrsSource already copy it and never delete it.
// SrsLiveSource already copy it and never delete it.
req = r;
// the ep(endpoint) to forward to

View file

@ -37,7 +37,7 @@ class SrsMessageQueue;
class SrsRtmpJitter;
class SrsRtmpClient;
class SrsRequest;
class SrsSource;
class SrsLiveSource;
class SrsOriginHub;
class SrsKbps;
class SrsSimpleRtmpClient;

View file

@ -1432,7 +1432,7 @@ srs_error_t SrsGb28181RtmpMuxer::initialize(SrsServer *s, SrsRequest* r)
req = r;
server = s;
if ((err = _srs_sources->fetch_or_create(req, (ISrsSourceHandler*)server, &source)) != srs_success) {
if ((err = _srs_sources->fetch_or_create(req, (ISrsLiveSourceHandler*)server, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

View file

@ -100,7 +100,7 @@ class SrsGb28181StreamChannel;
class SrsGb28181SipSession;
class SrsRtpJitterBuffer;
class SrsServer;
class SrsSource;
class SrsLiveSource;
class SrsRequest;
class SrsResourceManager;
class SrsGb28181Conn;
@ -326,7 +326,7 @@ private:
std::string aac_specific_config;
SrsRequest* req;
SrsSource* source;
SrsLiveSource* source;
SrsServer* server;
SrsRtpJitterBuffer *jitter_buffer;

View file

@ -33,7 +33,7 @@
class SrsRequest;
class SrsSharedPtrMessage;
class SrsHdsFragment;
class SrsSource;
class SrsLiveSource;
// Mux RTMP to Adobe HDS streaming.
class SrsHds

View file

@ -41,7 +41,7 @@ class SrsRtmpJitter;
class SrsTsContextWriter;
class SrsRequest;
class SrsPithyPrint;
class SrsSource;
class SrsLiveSource;
class SrsOriginHub;
class SrsFileWriter;
class SrsSimpleStream;

View file

@ -533,12 +533,12 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return http_static->mux.serve_http(w, r);
}
srs_error_t SrsHttpServer::http_mount(SrsSource* s, SrsRequest* r)
srs_error_t SrsHttpServer::http_mount(SrsLiveSource* s, SrsRequest* r)
{
return http_stream->http_mount(s, r);
}
void SrsHttpServer::http_unmount(SrsSource* s, SrsRequest* r)
void SrsHttpServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
{
http_stream->http_unmount(s, r);
}

View file

@ -38,9 +38,9 @@
#include <srs_app_source.hpp>
class SrsServer;
class SrsSource;
class SrsLiveSource;
class SrsRequest;
class SrsConsumer;
class SrsLiveConsumer;
class SrsStSocket;
class SrsHttpParser;
class ISrsHttpMessage;
@ -208,8 +208,8 @@ public:
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
public:
virtual srs_error_t http_mount(SrsSource* s, SrsRequest* r);
virtual void http_unmount(SrsSource* s, SrsRequest* r);
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
};
#endif

View file

@ -57,7 +57,7 @@ using namespace std;
#include <srs_app_recv_thread.hpp>
#include <srs_app_http_hooks.hpp>
SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
SrsBufferCache::SrsBufferCache(SrsLiveSource* s, SrsRequest* r)
{
req = r->copy()->as_http();
source = s;
@ -76,7 +76,7 @@ SrsBufferCache::~SrsBufferCache()
srs_freep(req);
}
srs_error_t SrsBufferCache::update_auth(SrsSource* s, SrsRequest* r)
srs_error_t SrsBufferCache::update_auth(SrsLiveSource* s, SrsRequest* r)
{
srs_freep(req);
req = r->copy();
@ -96,7 +96,7 @@ srs_error_t SrsBufferCache::start()
return err;
}
srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
srs_error_t SrsBufferCache::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{
srs_error_t err = srs_success;
@ -104,7 +104,7 @@ srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgor
return err;
}
// the jitter is get from SrsSource, which means the time_jitter of vhost.
// the jitter is get from SrsLiveSource, which means the time_jitter of vhost.
if ((err = queue->dump_packets(consumer, false, jitter)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}
@ -127,8 +127,8 @@ srs_error_t SrsBufferCache::cycle()
// the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge.
SrsConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer);
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
@ -241,11 +241,11 @@ srs_error_t SrsTsStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*da
bool SrsTsStreamEncoder::has_cache()
{
// for ts stream, use gop cache of SrsSource is ok.
// for ts stream, use gop cache of SrsLiveSource is ok.
return false;
}
srs_error_t SrsTsStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/)
srs_error_t SrsTsStreamEncoder::dump_cache(SrsLiveConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/)
{
// for ts stream, ignore cache.
return srs_success;
@ -308,11 +308,11 @@ srs_error_t SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, i
bool SrsFlvStreamEncoder::has_cache()
{
// for flv stream, use gop cache of SrsSource is ok.
// for flv stream, use gop cache of SrsLiveSource is ok.
return false;
}
srs_error_t SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/)
srs_error_t SrsFlvStreamEncoder::dump_cache(SrsLiveConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/)
{
// for flv stream, ignore cache.
return srs_success;
@ -412,7 +412,7 @@ bool SrsAacStreamEncoder::has_cache()
return true;
}
srs_error_t SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
srs_error_t SrsAacStreamEncoder::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{
srs_assert(cache);
return cache->dump_cache(consumer, jitter);
@ -468,7 +468,7 @@ bool SrsMp3StreamEncoder::has_cache()
return true;
}
srs_error_t SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
srs_error_t SrsMp3StreamEncoder::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{
srs_assert(cache);
return cache->dump_cache(consumer, jitter);
@ -515,7 +515,7 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
return writer->writev(iov, iovcnt, pnwrite);
}
SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsBufferCache* c)
SrsLiveStream::SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c)
{
source = s;
cache = c;
@ -527,7 +527,7 @@ SrsLiveStream::~SrsLiveStream()
srs_freep(req);
}
srs_error_t SrsLiveStream::update_auth(SrsSource* s, SrsRequest* r)
srs_error_t SrsLiveStream::update_auth(SrsLiveSource* s, SrsRequest* r)
{
source = s;
@ -585,8 +585,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
w->write_header(SRS_CONSTS_HTTP_OK);
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer);
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
@ -893,7 +893,7 @@ srs_error_t SrsHttpStreamServer::initialize()
}
// TODO: FIXME: rename for HTTP FLV mount.
srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
@ -967,7 +967,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
return err;
}
void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r)
void SrsHttpStreamServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
{
std::string sid = r->get_stream_url();
@ -1000,7 +1000,7 @@ srs_error_t SrsHttpStreamServer::on_reload_vhost_http_remux_updated(string vhost
return srs_error_wrap(err, "init flv entry");
}
// http mount need SrsRequest and SrsSource param, only create a mapping template entry
// http mount need SrsRequest and SrsLiveSource param, only create a mapping template entry
// and do mount automatically on playing http flv if this stream is a new http_remux stream.
return err;
}
@ -1019,7 +1019,7 @@ srs_error_t SrsHttpStreamServer::on_reload_vhost_http_remux_updated(string vhost
continue;
}
SrsSource* source = entry->source;
SrsLiveSource* source = entry->source;
if (_srs_config->get_vhost_http_remux_enabled(vhost)) {
http_mount(source, req);
} else {
@ -1127,7 +1127,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}
}
SrsSource* s = NULL;
SrsLiveSource* s = NULL;
if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) {
return srs_error_wrap(err, "source create");
}

View file

@ -40,16 +40,16 @@ private:
srs_utime_t fast_cache;
private:
SrsMessageQueue* queue;
SrsSource* source;
SrsLiveSource* source;
SrsRequest* req;
SrsCoroutine* trd;
public:
SrsBufferCache(SrsSource* s, SrsRequest* r);
SrsBufferCache(SrsLiveSource* s, SrsRequest* r);
virtual ~SrsBufferCache();
virtual srs_error_t update_auth(SrsSource* s, SrsRequest* r);
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
public:
virtual srs_error_t start();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
// Interface ISrsEndlessThreadHandler.
public:
virtual srs_error_t cycle();
@ -72,11 +72,11 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size) = 0;
public:
// For some stream, for example, mp3 and aac, the audio stream,
// we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio.
// @return true to use gop cache of encoder; otherwise, use SrsSource.
// we use large gop cache in encoder, for the gop cache of SrsLiveSource is ignore audio.
// @return true to use gop cache of encoder; otherwise, use SrsLiveSource.
virtual bool has_cache() = 0;
// Dumps the cache of encoder to consumer.
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0;
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0;
};
// Transmux RTMP to HTTP Live Streaming.
@ -95,7 +95,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
public:
// Write the tags in a time.
virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count);
@ -118,7 +118,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
};
// Transmux RTMP with AAC stream to HTTP AAC Streaming.
@ -137,7 +137,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
};
// Transmux RTMP with MP3 stream to HTTP MP3 Streaming.
@ -156,7 +156,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
};
// Write stream to http response direclty.
@ -184,12 +184,12 @@ class SrsLiveStream : public ISrsHttpHandler
{
private:
SrsRequest* req;
SrsSource* source;
SrsLiveSource* source;
SrsBufferCache* cache;
public:
SrsLiveStream(SrsSource* s, SrsRequest* r, SrsBufferCache* c);
SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsSource* s, SrsRequest* r);
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
@ -211,7 +211,7 @@ public:
// We will free the request.
SrsRequest* req;
// Shared source.
SrsSource* source;
SrsLiveSource* source;
public:
// For template, the mount contains variables.
// For concrete stream, the mount is url to access.
@ -249,8 +249,8 @@ public:
virtual srs_error_t initialize();
public:
// HTTP flv/ts/mp3/aac stream
virtual srs_error_t http_mount(SrsSource* s, SrsRequest* r);
virtual void http_unmount(SrsSource* s, SrsRequest* r);
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_reload_vhost_added(std::string vhost);

View file

@ -164,7 +164,7 @@ srs_error_t SrsRecvThread::do_cycle()
return err;
}
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid)
SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
_consumer = consumer;
@ -278,7 +278,7 @@ void SrsQueueRecvThread::on_stop()
}
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, SrsContextId parent_cid)
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
rtmp = rtmp_sdk;

View file

@ -37,9 +37,9 @@
class SrsRtmpServer;
class SrsCommonMessage;
class SrsRtmpConn;
class SrsSource;
class SrsLiveSource;
class SrsRequest;
class SrsConsumer;
class SrsLiveConsumer;
class SrsHttpConn;
class SrsResponseOnlyHttpConn;
@ -114,10 +114,10 @@ private:
SrsRtmpServer* rtmp;
// The recv thread error code.
srs_error_t recv_error;
SrsConsumer* _consumer;
SrsLiveConsumer* _consumer;
public:
// TODO: FIXME: Refine timeout in time unit.
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid);
SrsQueueRecvThread(SrsLiveConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid);
virtual ~SrsQueueRecvThread();
public:
virtual srs_error_t start();
@ -164,7 +164,7 @@ private:
srs_error_t recv_error;
SrsRtmpConn* _conn;
// The params for conn callback.
SrsSource* _source;
SrsLiveSource* _source;
// The error timeout cond
// @see https://github.com/ossrs/srs/issues/244
srs_cond_t error;
@ -173,7 +173,7 @@ private:
SrsContextId ncid;
public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, SrsContextId parent_cid);
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid);
virtual ~SrsPublishRecvThread();
public:
// Wait for error for some timeout.

View file

@ -463,7 +463,7 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, Srs
return err;
}
void SrsRtcPlayStream::on_stream_change(SrsRtcStreamDescription* desc)
void SrsRtcPlayStream::on_stream_change(SrsRtcSourceDescription* desc)
{
// Refresh the relation for audio.
// TODO: FIXME: Match by label?
@ -555,7 +555,7 @@ srs_error_t SrsRtcPlayStream::cycle()
{
srs_error_t err = srs_success;
SrsRtcStream* source = source_;
SrsRtcSource* source = source_;
SrsRtcConsumer* consumer = NULL;
SrsAutoFree(SrsRtcConsumer, consumer);
@ -1009,7 +1009,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(req);
}
srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescription* stream_desc)
srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc)
{
srs_error_t err = srs_success;
@ -1073,7 +1073,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost);
if (rtc_to_rtmp) {
SrsSource *rtmp = NULL;
SrsLiveSource *rtmp = NULL;
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -1848,8 +1848,8 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRtcUserConfig* ruc, SrsSdp& local
SrsRequest* req = ruc->req_;
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
SrsAutoFree(SrsRtcStreamDescription, stream_desc);
SrsRtcSourceDescription* stream_desc = new SrsRtcSourceDescription();
SrsAutoFree(SrsRtcSourceDescription, stream_desc);
// TODO: FIXME: Change to api of stream desc.
if ((err = negotiate_publish_capability(ruc, stream_desc)) != srs_success) {
@ -1860,7 +1860,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRtcUserConfig* ruc, SrsSdp& local
return srs_error_wrap(err, "generate local sdp");
}
SrsRtcStream* source = NULL;
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -1905,8 +1905,8 @@ srs_error_t SrsRtcConnection::add_player(SrsRtcUserConfig* ruc, SrsSdp& local_sd
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no play relations");
}
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
SrsAutoFree(SrsRtcStreamDescription, stream_desc);
SrsRtcSourceDescription* stream_desc = new SrsRtcSourceDescription();
SrsAutoFree(SrsRtcSourceDescription, stream_desc);
std::map<uint32_t, SrsRtcTrackDescription*>::iterator it = play_sub_relations.begin();
while (it != play_sub_relations.end()) {
SrsRtcTrackDescription* track_desc = it->second;
@ -2676,7 +2676,7 @@ bool srs_sdp_has_h264_profile(const SrsSdp& sdp, const string& profile)
return false;
}
srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcStreamDescription* stream_desc)
srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcSourceDescription* stream_desc)
{
srs_error_t err = srs_success;
@ -2914,7 +2914,7 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig* ruc
return err;
}
srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan)
srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan)
{
srs_error_t err = srs_success;
@ -3031,7 +3031,7 @@ srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRtcUserConfig* ruc, s
// TODO: FIME: Should check packetization-mode=1 also.
bool has_42e01f = srs_sdp_has_h264_profile(remote_sdp, "42e01f");
SrsRtcStream* source = NULL;
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "fetch rtc source");
}
@ -3152,7 +3152,7 @@ srs_error_t SrsRtcConnection::fetch_source_capability(SrsRequest* req, std::map<
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost);
bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost);
SrsRtcStream* source = NULL;
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "fetch rtc source");
}
@ -3249,7 +3249,7 @@ void video_track_generate_play_offer(SrsRtcTrackDescription* track, string mid,
}
}
srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan)
srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan)
{
srs_error_t err = srs_success;
@ -3443,7 +3443,7 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::map<uint32_t,
return err;
}
srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDescription* stream_desc)
srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDescription* stream_desc)
{
srs_error_t err = srs_success;

View file

@ -47,12 +47,12 @@
#include <sys/socket.h>
class SrsUdpMuxSocket;
class SrsConsumer;
class SrsLiveConsumer;
class SrsStunPacket;
class SrsRtcServer;
class SrsRtcConnection;
class SrsSharedPtrMessage;
class SrsRtcStream;
class SrsRtcSource;
class SrsRtpPacket;
class ISrsCodec;
class SrsRtpNackForReceiver;
@ -214,7 +214,7 @@ public:
// A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
, public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
, public ISrsRtcPLIWorkerHandler, public ISrsRtcSourceChangeCallback
{
private:
SrsContextId cid_;
@ -223,7 +223,7 @@ private:
SrsRtcPLIWorker* pli_worker_;
private:
SrsRequest* req_;
SrsRtcStream* source_;
SrsRtcSource* source_;
// key: publish_ssrc, value: send track to process rtp/rtcp
std::map<uint32_t, SrsRtcAudioSendTrack*> audio_tracks_;
std::map<uint32_t, SrsRtcVideoSendTrack*> video_tracks_;
@ -252,9 +252,9 @@ public:
virtual ~SrsRtcPlayStream();
public:
srs_error_t initialize(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations);
// Interface ISrsRtcStreamChangeCallback
// Interface ISrsRtcSourceChangeCallback
public:
void on_stream_change(SrsRtcStreamDescription* desc);
void on_stream_change(SrsRtcSourceDescription* desc);
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
@ -335,7 +335,7 @@ private:
SrsErrorPithyPrint* pli_epp;
private:
SrsRequest* req;
SrsRtcStream* source;
SrsRtcSource* source;
// Simulators.
int nn_simulate_nack_drop;
private:
@ -353,7 +353,7 @@ public:
SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid);
virtual ~SrsRtcPublishStream();
public:
srs_error_t initialize(SrsRequest* req, SrsRtcStreamDescription* stream_desc);
srs_error_t initialize(SrsRequest* req, SrsRtcSourceDescription* stream_desc);
srs_error_t start();
// Directly set the status of track, generally for init to set the default value.
void set_all_tracks_status(bool status);
@ -553,15 +553,15 @@ public:
private:
srs_error_t on_binding_request(SrsStunPacket* r);
// publish media capabilitiy negotiate
srs_error_t negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcStreamDescription* stream_desc);
srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan);
srs_error_t negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcSourceDescription* stream_desc);
srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan);
// play media capabilitiy negotiate
//TODO: Use StreamDescription to negotiate and remove first negotiate_play_capability function
srs_error_t negotiate_play_capability(SrsRtcUserConfig* ruc, std::map<uint32_t, SrsRtcTrackDescription*>& sub_relations);
srs_error_t fetch_source_capability(SrsRequest* req, std::map<uint32_t, SrsRtcTrackDescription*>& sub_relations);
srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan);
srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan);
srs_error_t create_player(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations);
srs_error_t create_publisher(SrsRequest* request, SrsRtcStreamDescription* stream_desc);
srs_error_t create_publisher(SrsRequest* request, SrsRtcSourceDescription* stream_desc);
};
class ISrsRtcHijacker

View file

@ -469,7 +469,7 @@ srs_error_t SrsRtcServer::create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sd
SrsRequest* req = ruc->req_;
SrsRtcStream* source = NULL;
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

View file

@ -40,7 +40,7 @@ class SrsHourGlass;
class SrsRtcConnection;
class SrsRequest;
class SrsSdp;
class SrsRtcStream;
class SrsRtcSource;
class SrsResourceManager;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.

View file

@ -152,15 +152,15 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
return srs_ntp;
}
ISrsRtcStreamChangeCallback::ISrsRtcStreamChangeCallback()
ISrsRtcSourceChangeCallback::ISrsRtcSourceChangeCallback()
{
}
ISrsRtcStreamChangeCallback::~ISrsRtcStreamChangeCallback()
ISrsRtcSourceChangeCallback::~ISrsRtcSourceChangeCallback()
{
}
SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s)
SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s)
{
source = s;
should_update_source_id = false;
@ -240,24 +240,24 @@ void SrsRtcConsumer::wait(int nb_msgs)
srs_cond_wait(mw_wait);
}
void SrsRtcConsumer::on_stream_change(SrsRtcStreamDescription* desc)
void SrsRtcConsumer::on_stream_change(SrsRtcSourceDescription* desc)
{
if (handler_) {
handler_->on_stream_change(desc);
}
}
SrsRtcStreamManager::SrsRtcStreamManager()
SrsRtcSourceManager::SrsRtcSourceManager()
{
lock = srs_mutex_new();
}
SrsRtcStreamManager::~SrsRtcStreamManager()
SrsRtcSourceManager::~SrsRtcSourceManager()
{
srs_mutex_destroy(lock);
}
srs_error_t SrsRtcStreamManager::fetch_or_create(SrsRequest* r, SrsRtcStream** pps)
srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps)
{
srs_error_t err = srs_success;
@ -265,7 +265,7 @@ srs_error_t SrsRtcStreamManager::fetch_or_create(SrsRequest* r, SrsRtcStream** p
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsRtcStream* source = NULL;
SrsRtcSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return err;
@ -279,7 +279,7 @@ srs_error_t SrsRtcStreamManager::fetch_or_create(SrsRequest* r, SrsRtcStream** p
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsRtcStream();
source = new SrsRtcSource();
if ((err = source->initialize(r)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}
@ -291,9 +291,9 @@ srs_error_t SrsRtcStreamManager::fetch_or_create(SrsRequest* r, SrsRtcStream** p
return err;
}
SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r)
{
SrsRtcStream* source = NULL;
SrsRtcSource* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
@ -310,7 +310,7 @@ SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
return source;
}
SrsRtcStreamManager* _srs_rtc_sources = NULL;
SrsRtcSourceManager* _srs_rtc_sources = NULL;
ISrsRtcPublishStream::ISrsRtcPublishStream()
{
@ -320,11 +320,11 @@ ISrsRtcPublishStream::~ISrsRtcPublishStream()
{
}
ISrsRtcStreamEventHandler::ISrsRtcStreamEventHandler()
ISrsRtcSourceEventHandler::ISrsRtcSourceEventHandler()
{
}
ISrsRtcStreamEventHandler::~ISrsRtcStreamEventHandler()
ISrsRtcSourceEventHandler::~ISrsRtcSourceEventHandler()
{
}
@ -336,7 +336,7 @@ ISrsRtcSourceBridger::~ISrsRtcSourceBridger()
{
}
SrsRtcStream::SrsRtcStream()
SrsRtcSource::SrsRtcSource()
{
is_created_ = false;
is_delivering_packets_ = false;
@ -350,7 +350,7 @@ SrsRtcStream::SrsRtcStream()
pli_for_rtmp_ = pli_elapsed_ = 0;
}
SrsRtcStream::~SrsRtcStream()
SrsRtcSource::~SrsRtcSource()
{
// never free the consumers,
// for all consumers are auto free.
@ -361,7 +361,7 @@ SrsRtcStream::~SrsRtcStream()
srs_freep(stream_desc_);
}
srs_error_t SrsRtcStream::initialize(SrsRequest* r)
srs_error_t SrsRtcSource::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
@ -370,12 +370,12 @@ srs_error_t SrsRtcStream::initialize(SrsRequest* r)
return err;
}
void SrsRtcStream::update_auth(SrsRequest* r)
void SrsRtcSource::update_auth(SrsRequest* r)
{
req->update_auth(r);
}
srs_error_t SrsRtcStream::on_source_changed()
srs_error_t SrsRtcSource::on_source_changed()
{
srs_error_t err = srs_success;
@ -408,23 +408,23 @@ srs_error_t SrsRtcStream::on_source_changed()
return err;
}
SrsContextId SrsRtcStream::source_id()
SrsContextId SrsRtcSource::source_id()
{
return _source_id;
}
SrsContextId SrsRtcStream::pre_source_id()
SrsContextId SrsRtcSource::pre_source_id()
{
return _pre_source_id;
}
void SrsRtcStream::set_bridger(ISrsRtcSourceBridger *bridger)
void SrsRtcSource::set_bridger(ISrsRtcSourceBridger *bridger)
{
srs_freep(bridger_);
bridger_ = bridger;
}
srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer)
srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;
@ -436,7 +436,7 @@ srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer)
return err;
}
srs_error_t SrsRtcStream::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg)
srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
@ -446,7 +446,7 @@ srs_error_t SrsRtcStream::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool
return err;
}
void SrsRtcStream::on_consumer_destroy(SrsRtcConsumer* consumer)
void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)
{
std::vector<SrsRtcConsumer*>::iterator it;
it = std::find(consumers.begin(), consumers.end(), consumer);
@ -457,26 +457,26 @@ void SrsRtcStream::on_consumer_destroy(SrsRtcConsumer* consumer)
// When all consumers finished, notify publisher to handle it.
if (publish_stream_ && consumers.empty()) {
for (size_t i = 0; i < event_handlers_.size(); i++) {
ISrsRtcStreamEventHandler* h = event_handlers_.at(i);
ISrsRtcSourceEventHandler* h = event_handlers_.at(i);
h->on_consumers_finished();
}
}
}
bool SrsRtcStream::can_publish()
bool SrsRtcSource::can_publish()
{
// TODO: FIXME: Should check the status of bridger.
return !is_created_;
}
void SrsRtcStream::set_stream_created()
void SrsRtcSource::set_stream_created()
{
srs_assert(!is_created_ && !is_delivering_packets_);
is_created_ = true;
}
srs_error_t SrsRtcStream::on_publish()
srs_error_t SrsRtcSource::on_publish()
{
srs_error_t err = srs_success;
@ -502,7 +502,7 @@ srs_error_t SrsRtcStream::on_publish()
// The PLI interval for RTC2RTMP.
pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
// @see SrsRtcStream::on_timer()
// @see SrsRtcSource::on_timer()
_srs_hybrid->timer100ms()->subscribe(this);
}
@ -511,7 +511,7 @@ srs_error_t SrsRtcStream::on_publish()
return err;
}
void SrsRtcStream::on_unpublish()
void SrsRtcSource::on_unpublish()
{
// ignore when already unpublished.
if (!is_created_) {
@ -529,13 +529,13 @@ void SrsRtcStream::on_unpublish()
_source_id = SrsContextId();
for (size_t i = 0; i < event_handlers_.size(); i++) {
ISrsRtcStreamEventHandler* h = event_handlers_.at(i);
ISrsRtcSourceEventHandler* h = event_handlers_.at(i);
h->on_unpublish();
}
//free bridger resource
if (bridger_) {
// For SrsRtcStream::on_timer()
// For SrsRtcSource::on_timer()
_srs_hybrid->timer100ms()->unsubscribe(this);
bridger_->on_unpublish();
@ -548,33 +548,33 @@ void SrsRtcStream::on_unpublish()
// TODO: FIXME: Handle by statistic.
}
void SrsRtcStream::subscribe(ISrsRtcStreamEventHandler* h)
void SrsRtcSource::subscribe(ISrsRtcSourceEventHandler* h)
{
if (std::find(event_handlers_.begin(), event_handlers_.end(), h) == event_handlers_.end()) {
event_handlers_.push_back(h);
}
}
void SrsRtcStream::unsubscribe(ISrsRtcStreamEventHandler* h)
void SrsRtcSource::unsubscribe(ISrsRtcSourceEventHandler* h)
{
std::vector<ISrsRtcStreamEventHandler*>::iterator it;
std::vector<ISrsRtcSourceEventHandler*>::iterator it;
it = std::find(event_handlers_.begin(), event_handlers_.end(), h);
if (it != event_handlers_.end()) {
event_handlers_.erase(it);
}
}
ISrsRtcPublishStream* SrsRtcStream::publish_stream()
ISrsRtcPublishStream* SrsRtcSource::publish_stream()
{
return publish_stream_;
}
void SrsRtcStream::set_publish_stream(ISrsRtcPublishStream* v)
void SrsRtcSource::set_publish_stream(ISrsRtcPublishStream* v)
{
publish_stream_ = v;
}
srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket* pkt)
srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;
@ -598,12 +598,12 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket* pkt)
return err;
}
bool SrsRtcStream::has_stream_desc()
bool SrsRtcSource::has_stream_desc()
{
return stream_desc_;
}
void SrsRtcStream::set_stream_desc(SrsRtcStreamDescription* stream_desc)
void SrsRtcSource::set_stream_desc(SrsRtcSourceDescription* stream_desc)
{
srs_freep(stream_desc_);
@ -612,7 +612,7 @@ void SrsRtcStream::set_stream_desc(SrsRtcStreamDescription* stream_desc)
}
}
std::vector<SrsRtcTrackDescription*> SrsRtcStream::get_track_desc(std::string type, std::string media_name)
std::vector<SrsRtcTrackDescription*> SrsRtcSource::get_track_desc(std::string type, std::string media_name)
{
std::vector<SrsRtcTrackDescription*> track_descs;
if (!stream_desc_) {
@ -636,7 +636,7 @@ std::vector<SrsRtcTrackDescription*> SrsRtcStream::get_track_desc(std::string ty
return track_descs;
}
srs_error_t SrsRtcStream::on_timer(srs_utime_t interval)
srs_error_t SrsRtcSource::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
@ -662,7 +662,7 @@ srs_error_t SrsRtcStream::on_timer(srs_utime_t interval)
}
#ifdef SRS_FFMPEG_FIT
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source)
{
req = NULL;
source_ = source;
@ -675,8 +675,8 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
audio_sequence = 0;
video_sequence = 0;
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
SrsAutoFree(SrsRtcStreamDescription, stream_desc);
SrsRtcSourceDescription* stream_desc = new SrsRtcSourceDescription();
SrsAutoFree(SrsRtcSourceDescription, stream_desc);
// audio track description
if (true) {
@ -989,7 +989,7 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* f
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket* pkt)
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;
@ -1231,7 +1231,7 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacket*>& pkts)
return err;
}
SrsRtmpFromRtcBridger::SrsRtmpFromRtcBridger(SrsSource *src)
SrsRtmpFromRtcBridger::SrsRtmpFromRtcBridger(SrsLiveSource *src)
{
source_ = src;
codec_ = NULL;
@ -2130,12 +2130,12 @@ SrsRtcTrackDescription* SrsRtcTrackDescription::copy()
return cp;
}
SrsRtcStreamDescription::SrsRtcStreamDescription()
SrsRtcSourceDescription::SrsRtcSourceDescription()
{
audio_track_desc_ = NULL;
}
SrsRtcStreamDescription::~SrsRtcStreamDescription()
SrsRtcSourceDescription::~SrsRtcSourceDescription()
{
srs_freep(audio_track_desc_);
@ -2145,9 +2145,9 @@ SrsRtcStreamDescription::~SrsRtcStreamDescription()
video_track_descs_.clear();
}
SrsRtcStreamDescription* SrsRtcStreamDescription::copy()
SrsRtcSourceDescription* SrsRtcSourceDescription::copy()
{
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
SrsRtcSourceDescription* stream_desc = new SrsRtcSourceDescription();
if (audio_track_desc_) {
stream_desc->audio_track_desc_ = audio_track_desc_->copy();
@ -2160,7 +2160,7 @@ SrsRtcStreamDescription* SrsRtcStreamDescription::copy()
return stream_desc;
}
SrsRtcTrackDescription* SrsRtcStreamDescription::find_track_description_by_ssrc(uint32_t ssrc)
SrsRtcTrackDescription* SrsRtcSourceDescription::find_track_description_by_ssrc(uint32_t ssrc)
{
if (audio_track_desc_ && audio_track_desc_->has_ssrc(ssrc)) {
return audio_track_desc_;
@ -2328,7 +2328,7 @@ void SrsRtcAudioRecvTrack::on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer
*ppt = SrsRtspPacketPayloadTypeRaw;
}
srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket* pkt)
srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;
@ -2384,7 +2384,7 @@ void SrsRtcVideoRecvTrack::on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer
}
}
srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket* pkt)
srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;

View file

@ -43,12 +43,12 @@ class SrsMetaCache;
class SrsSharedPtrMessage;
class SrsCommonMessage;
class SrsMessageArray;
class SrsRtcStream;
class SrsRtcSource;
class SrsRtcFromRtmpBridger;
class SrsAudioTranscoder;
class SrsRtpPacket;
class SrsSample;
class SrsRtcStreamDescription;
class SrsRtcSourceDescription;
class SrsRtcTrackDescription;
class SrsRtcConnection;
class SrsRtpRingBuffer;
@ -74,20 +74,20 @@ public:
};
// When RTC stream publish and re-publish.
class ISrsRtcStreamChangeCallback
class ISrsRtcSourceChangeCallback
{
public:
ISrsRtcStreamChangeCallback();
virtual ~ISrsRtcStreamChangeCallback();
ISrsRtcSourceChangeCallback();
virtual ~ISrsRtcSourceChangeCallback();
public:
virtual void on_stream_change(SrsRtcStreamDescription* desc) = 0;
virtual void on_stream_change(SrsRtcSourceDescription* desc) = 0;
};
// The RTC stream consumer, consume packets from RTC stream source.
class SrsRtcConsumer
{
private:
SrsRtcStream* source;
SrsRtcSource* source;
std::vector<SrsRtpPacket*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
@ -98,9 +98,9 @@ private:
int mw_min_msgs;
private:
// The callback for stream change event.
ISrsRtcStreamChangeCallback* handler_;
ISrsRtcSourceChangeCallback* handler_;
public:
SrsRtcConsumer(SrsRtcStream* s);
SrsRtcConsumer(SrsRtcSource* s);
virtual ~SrsRtcConsumer();
public:
// When source id changed, notice client to print.
@ -113,31 +113,31 @@ public:
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs);
public:
void set_handler(ISrsRtcStreamChangeCallback* h) { handler_ = h; } // SrsRtcConsumer::set_handler()
void on_stream_change(SrsRtcStreamDescription* desc);
void set_handler(ISrsRtcSourceChangeCallback* h) { handler_ = h; } // SrsRtcConsumer::set_handler()
void on_stream_change(SrsRtcSourceDescription* desc);
};
class SrsRtcStreamManager
class SrsRtcSourceManager
{
private:
srs_mutex_t lock;
std::map<std::string, SrsRtcStream*> pool;
std::map<std::string, SrsRtcSource*> pool;
public:
SrsRtcStreamManager();
virtual ~SrsRtcStreamManager();
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcStream** pps);
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
virtual SrsRtcStream* fetch(SrsRequest* r);
virtual SrsRtcSource* fetch(SrsRequest* r);
};
// Global singleton instance.
extern SrsRtcStreamManager* _srs_rtc_sources;
extern SrsRtcSourceManager* _srs_rtc_sources;
// A publish stream interface, for source to callback with.
class ISrsRtcPublishStream
@ -150,11 +150,11 @@ public:
virtual void request_keyframe(uint32_t ssrc) = 0;
};
class ISrsRtcStreamEventHandler
class ISrsRtcSourceEventHandler
{
public:
ISrsRtcStreamEventHandler();
virtual ~ISrsRtcStreamEventHandler();
ISrsRtcSourceEventHandler();
virtual ~ISrsRtcSourceEventHandler();
public:
// stream unpublish, sync API.
virtual void on_unpublish() = 0;
@ -162,7 +162,7 @@ public:
virtual void on_consumers_finished() = 0;
};
// SrsRtcStream bridge to SrsSource
// SrsRtcSource bridge to SrsLiveSource
class ISrsRtcSourceBridger
{
public:
@ -175,7 +175,7 @@ public:
};
// A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream.
class SrsRtcStream : public ISrsFastTimer
class SrsRtcSource : public ISrsFastTimer
{
private:
// For publish, it's the publish client id.
@ -188,7 +188,7 @@ private:
SrsRequest* req;
ISrsRtcPublishStream* publish_stream_;
// Steam description for this steam.
SrsRtcStreamDescription* stream_desc_;
SrsRtcSourceDescription* stream_desc_;
// The Source bridger, bridger stream to other source.
ISrsRtcSourceBridger* bridger_;
private:
@ -199,14 +199,14 @@ private:
// Whether stream is delivering data, that is, DTLS is done.
bool is_delivering_packets_;
// Notify stream event to event handler
std::vector<ISrsRtcStreamEventHandler*> event_handlers_;
std::vector<ISrsRtcSourceEventHandler*> event_handlers_;
private:
// The PLI for RTC2RTMP.
srs_utime_t pli_for_rtmp_;
srs_utime_t pli_elapsed_;
public:
SrsRtcStream();
virtual ~SrsRtcStream();
SrsRtcSource();
virtual ~SrsRtcSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request.
@ -241,8 +241,8 @@ public:
virtual void on_unpublish();
public:
// For event handler
void subscribe(ISrsRtcStreamEventHandler* h);
void unsubscribe(ISrsRtcStreamEventHandler* h);
void subscribe(ISrsRtcSourceEventHandler* h);
void unsubscribe(ISrsRtcSourceEventHandler* h);
public:
// Get and set the publisher, passed to consumer to process requests such as PLI.
ISrsRtcPublishStream* publish_stream();
@ -251,7 +251,7 @@ public:
srs_error_t on_rtp(SrsRtpPacket* pkt);
// Set and get stream description for souce
bool has_stream_desc();
void set_stream_desc(SrsRtcStreamDescription* stream_desc);
void set_stream_desc(SrsRtcSourceDescription* stream_desc);
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
// interface ISrsFastTimer
private:
@ -259,11 +259,11 @@ private:
};
#ifdef SRS_FFMPEG_FIT
class SrsRtcFromRtmpBridger : public ISrsSourceBridger
class SrsRtcFromRtmpBridger : public ISrsLiveSourceBridger
{
private:
SrsRequest* req;
SrsRtcStream* source_;
SrsRtcSource* source_;
// The format, codec information.
SrsRtmpFormat* format;
// The metadata cache.
@ -278,7 +278,7 @@ private:
uint32_t audio_ssrc;
uint32_t video_ssrc;
public:
SrsRtcFromRtmpBridger(SrsRtcStream* source);
SrsRtcFromRtmpBridger(SrsRtcSource* source);
virtual ~SrsRtcFromRtmpBridger();
public:
virtual srs_error_t initialize(SrsRequest* r);
@ -292,7 +292,7 @@ public:
virtual srs_error_t on_video(SrsSharedPtrMessage* msg);
private:
srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, std::vector<SrsSample*>& samples);
srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket* pkt);
srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket* pkt);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector<SrsSample*>& samples, std::vector<SrsRtpPacket*>& pkts);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector<SrsRtpPacket*>& pkts);
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacket*>& pkts);
@ -302,7 +302,7 @@ private:
class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger
{
private:
SrsSource *source_;
SrsLiveSource *source_;
SrsAudioTranscoder *codec_;
bool is_first_audio;
bool is_first_video;
@ -323,7 +323,7 @@ private:
uint16_t lost_sn_;
int64_t key_frame_ts_;
public:
SrsRtmpFromRtcBridger(SrsSource *src);
SrsRtmpFromRtcBridger(SrsLiveSource *src);
virtual ~SrsRtmpFromRtcBridger();
public:
srs_error_t initialize(SrsRequest* r);
@ -504,7 +504,7 @@ public:
SrsRtcTrackDescription* copy();
};
class SrsRtcStreamDescription
class SrsRtcSourceDescription
{
public:
// the id for this stream;
@ -513,11 +513,11 @@ public:
SrsRtcTrackDescription* audio_track_desc_;
std::vector<SrsRtcTrackDescription*> video_track_descs_;
public:
SrsRtcStreamDescription();
virtual ~SrsRtcStreamDescription();
SrsRtcSourceDescription();
virtual ~SrsRtcSourceDescription();
public:
SrsRtcStreamDescription* copy();
SrsRtcSourceDescription* copy();
SrsRtcTrackDescription* find_track_description_by_ssrc(uint32_t ssrc);
};
@ -556,7 +556,7 @@ public:
// set to NULL, nack nerver copy it but set the pkt to NULL.
srs_error_t on_nack(SrsRtpPacket** ppkt);
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket* pkt) = 0;
virtual srs_error_t on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt) = 0;
virtual srs_error_t check_send_nacks() = 0;
protected:
virtual srs_error_t do_check_send_nacks(uint32_t& timeout_nacks);
@ -570,7 +570,7 @@ public:
public:
virtual void on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload, SrsRtspPacketPayloadType* ppt);
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket* pkt);
virtual srs_error_t on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt);
virtual srs_error_t check_send_nacks();
};
@ -582,7 +582,7 @@ public:
public:
virtual void on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload, SrsRtspPacketPayloadType* ppt);
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket* pkt);
virtual srs_error_t on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt);
virtual srs_error_t check_send_nacks();
};

View file

@ -526,7 +526,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
// find a source to serve.
SrsSource* source = NULL;
SrsLiveSource* source = NULL;
if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
return srs_error_wrap(err, "rtmp: fetch source");
}
@ -621,7 +621,7 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost)
return err;
}
srs_error_t SrsRtmpConn::playing(SrsSource* source)
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{
srs_error_t err = srs_success;
@ -677,8 +677,8 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
set_sock_options();
// Create a consumer of source.
SrsConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer);
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
@ -709,7 +709,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
return err;
}
srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{
srs_error_t err = srs_success;
@ -836,7 +836,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
return err;
}
srs_error_t SrsRtmpConn::publishing(SrsSource* source)
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{
srs_error_t err = srs_success;
@ -875,7 +875,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSource* source)
return err;
}
srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{
srs_error_t err = srs_success;
@ -955,7 +955,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
return err;
}
srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
{
srs_error_t err = srs_success;
@ -963,7 +963,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
// Check whether RTC stream is busy.
#ifdef SRS_RTC
SrsRtcStream *rtc = NULL;
SrsRtcSource *rtc = NULL;
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost);
if (rtc_server_enabled && rtc_enabled && !info->edge) {
@ -1003,7 +1003,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
}
}
void SrsRtmpConn::release_publish(SrsSource* source)
void SrsRtmpConn::release_publish(SrsLiveSource* source)
{
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
@ -1014,7 +1014,7 @@ void SrsRtmpConn::release_publish(SrsSource* source)
}
}
srs_error_t SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
@ -1055,7 +1055,7 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMess
return err;
}
srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
@ -1111,7 +1111,7 @@ srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMes
return err;
}
srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;

View file

@ -38,9 +38,9 @@ class SrsServer;
class SrsRtmpServer;
class SrsRequest;
class SrsResponse;
class SrsSource;
class SrsLiveSource;
class SrsRefer;
class SrsConsumer;
class SrsLiveConsumer;
class SrsCommonMessage;
class SrsStSocket;
class SrsHttpHooks;
@ -161,15 +161,15 @@ private:
// The stream(play/publish) service cycle, identify client first.
virtual srs_error_t stream_service_cycle();
virtual srs_error_t check_vhost(bool try_default_vhost);
virtual srs_error_t playing(SrsSource* source);
virtual srs_error_t do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsSource* source);
virtual srs_error_t do_publishing(SrsSource* source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsSource* source);
virtual void release_publish(SrsSource* source);
virtual srs_error_t handle_publish_message(SrsSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
virtual srs_error_t playing(SrsLiveSource* source);
virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsLiveSource* source);
virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsLiveSource* source);
virtual void release_publish(SrsLiveSource* source);
virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg);
virtual void set_sock_options();
private:
virtual srs_error_t check_edge_token_traverse_auth();

View file

@ -1782,7 +1782,7 @@ srs_error_t SrsServer::on_reload_http_stream_updated()
return err;
}
srs_error_t SrsServer::on_publish(SrsSource* s, SrsRequest* r)
srs_error_t SrsServer::on_publish(SrsLiveSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
@ -1798,7 +1798,7 @@ srs_error_t SrsServer::on_publish(SrsSource* s, SrsRequest* r)
return err;
}
void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r)
void SrsServer::on_unpublish(SrsLiveSource* s, SrsRequest* r)
{
http_server->http_unmount(s, r);

View file

@ -262,7 +262,7 @@ public:
// TODO: FIXME: Rename to SrsLiveServer.
// SRS RTMP server, initialize and listen, start connection service thread, destroy client.
class SrsServer : public ISrsReloadHandler, public ISrsSourceHandler
class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler
, public ISrsResourceManager, public ISrsCoroutineHandler
, public ISrsHourGlass
{
@ -394,10 +394,10 @@ public:
virtual srs_error_t on_reload_http_stream_enabled();
virtual srs_error_t on_reload_http_stream_disabled();
virtual srs_error_t on_reload_http_stream_updated();
// Interface ISrsSourceHandler
// Interface ISrsLiveSourceHandler
public:
virtual srs_error_t on_publish(SrsSource* s, SrsRequest* r);
virtual void on_unpublish(SrsSource* s, SrsRequest* r);
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
};
// The SRS server adapter, the master server.

View file

@ -329,7 +329,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
return err;
}
srs_error_t SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag)
srs_error_t SrsMessageQueue::dump_packets(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
@ -418,7 +418,7 @@ ISrsWakable::~ISrsWakable()
{
}
SrsConsumer::SrsConsumer(SrsSource* s)
SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s)
{
source = s;
paused = false;
@ -434,7 +434,7 @@ SrsConsumer::SrsConsumer(SrsSource* s)
#endif
}
SrsConsumer::~SrsConsumer()
SrsLiveConsumer::~SrsLiveConsumer()
{
source->on_consumer_destroy(this);
srs_freep(jitter);
@ -445,22 +445,22 @@ SrsConsumer::~SrsConsumer()
#endif
}
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
void SrsLiveConsumer::set_queue_size(srs_utime_t queue_size)
{
queue->set_queue_size(queue_size);
}
void SrsConsumer::update_source_id()
void SrsLiveConsumer::update_source_id()
{
should_update_source_id = true;
}
int64_t SrsConsumer::get_time()
int64_t SrsLiveConsumer::get_time()
{
return jitter->get_time();
}
srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
@ -504,7 +504,7 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
return err;
}
srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
srs_error_t err = srs_success;
@ -537,7 +537,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
void SrsLiveConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
{
if (paused) {
srs_usleep(SRS_CONSTS_RTMP_PULSE);
@ -563,7 +563,7 @@ void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
}
#endif
srs_error_t SrsConsumer::on_play_client_pause(bool is_pause)
srs_error_t SrsLiveConsumer::on_play_client_pause(bool is_pause)
{
srs_error_t err = srs_success;
@ -573,7 +573,7 @@ srs_error_t SrsConsumer::on_play_client_pause(bool is_pause)
return err;
}
void SrsConsumer::wakeup()
void SrsLiveConsumer::wakeup()
{
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (mw_waiting) {
@ -681,7 +681,7 @@ void SrsGopCache::clear()
audio_after_last_video_count = 0;
}
srs_error_t SrsGopCache::dump(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm)
srs_error_t SrsGopCache::dump(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm)
{
srs_error_t err = srs_success;
@ -719,11 +719,11 @@ bool SrsGopCache::pure_audio()
return cached_video_count == 0;
}
ISrsSourceHandler::ISrsSourceHandler()
ISrsLiveSourceHandler::ISrsLiveSourceHandler()
{
}
ISrsSourceHandler::~ISrsSourceHandler()
ISrsLiveSourceHandler::~ISrsLiveSourceHandler()
{
}
@ -861,7 +861,7 @@ SrsOriginHub::~SrsOriginHub()
#endif
}
srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r)
srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
@ -1213,7 +1213,7 @@ srs_error_t SrsOriginHub::on_dvr_request_sh()
// feed the dvr the metadata/sequence header,
// when reload to start dvr, dvr will never get the sequence header in stream,
// use the SrsSource.on_dvr_request_sh to push the sequence header to DVR.
// use the SrsLiveSource.on_dvr_request_sh to push the sequence header to DVR.
if (cache_metadata && (err = dvr->on_meta_data(cache_metadata)) != srs_success) {
return srs_error_wrap(err, "dvr metadata");
}
@ -1328,7 +1328,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost)
// when reload, we must fetch the sequence header from source cache.
// notice the source to get the cached sequence header.
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
// use the SrsLiveSource.on_hls_start to push the sequence header to HLS.
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
if (cache_sh_video) {
if ((err = format->on_video(cache_sh_video)) != srs_success) {
@ -1561,7 +1561,7 @@ SrsFormat* SrsMetaCache::ash_format()
return aformat;
}
srs_error_t SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
srs_error_t SrsMetaCache::dumps(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
{
srs_error_t err = srs_success;
@ -1684,26 +1684,26 @@ srs_error_t SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg)
return vformat->on_video(msg);
}
SrsSourceManager* _srs_sources = NULL;
SrsLiveSourceManager* _srs_sources = NULL;
SrsSourceManager::SrsSourceManager()
SrsLiveSourceManager::SrsLiveSourceManager()
{
lock = srs_mutex_new();
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
}
SrsSourceManager::~SrsSourceManager()
SrsLiveSourceManager::~SrsLiveSourceManager()
{
srs_mutex_destroy(lock);
srs_freep(timer_);
}
srs_error_t SrsSourceManager::initialize()
srs_error_t SrsLiveSourceManager::initialize()
{
return setup_ticks();
}
srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps)
{
srs_error_t err = srs_success;
@ -1712,7 +1712,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
// TODO: FIXME: Use smaller lock.
SrsLocker(lock);
SrsSource* source = NULL;
SrsLiveSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return err;
@ -1726,7 +1726,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsSource();
source = new SrsLiveSource();
if ((err = source->initialize(r, h)) != srs_success) {
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed;
@ -1741,9 +1741,9 @@ failed:
return err;
}
SrsSource* SrsSourceManager::fetch(SrsRequest* r)
SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r)
{
SrsSource* source = NULL;
SrsLiveSource* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
@ -1760,17 +1760,17 @@ SrsSource* SrsSourceManager::fetch(SrsRequest* r)
return source;
}
void SrsSourceManager::dispose()
void SrsLiveSourceManager::dispose()
{
std::map<std::string, SrsSource*>::iterator it;
std::map<std::string, SrsLiveSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
SrsLiveSource* source = it->second;
source->dispose();
}
return;
}
srs_error_t SrsSourceManager::setup_ticks()
srs_error_t SrsLiveSourceManager::setup_ticks()
{
srs_error_t err = srs_success;
@ -1785,13 +1785,13 @@ srs_error_t SrsSourceManager::setup_ticks()
return err;
}
srs_error_t SrsSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
std::map<std::string, SrsSource*>::iterator it;
std::map<std::string, SrsLiveSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second;
SrsLiveSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
if ((err = source->cycle()) != srs_success) {
@ -1826,25 +1826,25 @@ srs_error_t SrsSourceManager::notify(int event, srs_utime_t interval, srs_utime_
return err;
}
void SrsSourceManager::destroy()
void SrsLiveSourceManager::destroy()
{
std::map<std::string, SrsSource*>::iterator it;
std::map<std::string, SrsLiveSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
SrsLiveSource* source = it->second;
srs_freep(source);
}
pool.clear();
}
ISrsSourceBridger::ISrsSourceBridger()
ISrsLiveSourceBridger::ISrsLiveSourceBridger()
{
}
ISrsSourceBridger::~ISrsSourceBridger()
ISrsLiveSourceBridger::~ISrsLiveSourceBridger()
{
}
SrsSource::SrsSource()
SrsLiveSource::SrsLiveSource()
{
req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
@ -1870,7 +1870,7 @@ SrsSource::SrsSource()
atc = false;
}
SrsSource::~SrsSource()
SrsLiveSource::~SrsLiveSource()
{
_srs_config->unsubscribe(this);
@ -1890,14 +1890,14 @@ SrsSource::~SrsSource()
srs_freep(bridger_);
}
void SrsSource::dispose()
void SrsLiveSource::dispose()
{
hub->dispose();
meta->dispose();
gop_cache->dispose();
}
srs_error_t SrsSource::cycle()
srs_error_t SrsLiveSource::cycle()
{
srs_error_t err = hub->cycle();
if (err != srs_success) {
@ -1907,7 +1907,7 @@ srs_error_t SrsSource::cycle()
return srs_success;
}
bool SrsSource::expired()
bool SrsLiveSource::expired()
{
// unknown state?
if (die_at == 0) {
@ -1932,7 +1932,7 @@ bool SrsSource::expired()
return false;
}
srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h)
{
srs_error_t err = srs_success;
@ -1963,13 +1963,13 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
return err;
}
void SrsSource::set_bridger(ISrsSourceBridger* v)
void SrsLiveSource::set_bridger(ISrsLiveSourceBridger* v)
{
srs_freep(bridger_);
bridger_ = v;
}
srs_error_t SrsSource::on_reload_vhost_play(string vhost)
srs_error_t SrsLiveSource::on_reload_vhost_play(string vhost)
{
srs_error_t err = srs_success;
@ -2018,10 +2018,10 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost)
srs_utime_t v = _srs_config->get_queue_length(req->vhost);
if (true) {
std::vector<SrsConsumer*>::iterator it;
std::vector<SrsLiveConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
SrsLiveConsumer* consumer = *it;
consumer->set_queue_size(v);
}
@ -2052,7 +2052,7 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost)
return err;
}
srs_error_t SrsSource::on_source_id_changed(SrsContextId id)
srs_error_t SrsLiveSource::on_source_id_changed(SrsContextId id)
{
srs_error_t err = srs_success;
@ -2066,36 +2066,36 @@ srs_error_t SrsSource::on_source_id_changed(SrsContextId id)
_source_id = id;
// notice all consumer
std::vector<SrsConsumer*>::iterator it;
std::vector<SrsLiveConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
SrsLiveConsumer* consumer = *it;
consumer->update_source_id();
}
return err;
}
SrsContextId SrsSource::source_id()
SrsContextId SrsLiveSource::source_id()
{
return _source_id;
}
SrsContextId SrsSource::pre_source_id()
SrsContextId SrsLiveSource::pre_source_id()
{
return _pre_source_id;
}
bool SrsSource::inactive()
bool SrsLiveSource::inactive()
{
return _can_publish;
}
void SrsSource::update_auth(SrsRequest* r)
void SrsLiveSource::update_auth(SrsRequest* r)
{
req->update_auth(r);
}
bool SrsSource::can_publish(bool is_edge)
bool SrsLiveSource::can_publish(bool is_edge)
{
// TODO: FIXME: Should check the status of bridger.
@ -2106,7 +2106,7 @@ bool SrsSource::can_publish(bool is_edge)
return _can_publish;
}
srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
srs_error_t err = srs_success;
@ -2139,9 +2139,9 @@ srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket*
// copy to all consumer
if (!drop_for_reduce) {
std::vector<SrsConsumer*>::iterator it;
std::vector<SrsLiveConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
SrsLiveConsumer* consumer = *it;
if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume metadata");
}
@ -2152,7 +2152,7 @@ srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket*
return hub->on_meta_data(meta->data(), metadata);
}
srs_error_t SrsSource::on_audio(SrsCommonMessage* shared_audio)
srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio)
{
srs_error_t err = srs_success;
@ -2197,7 +2197,7 @@ srs_error_t SrsSource::on_audio(SrsCommonMessage* shared_audio)
return err;
}
srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
@ -2226,7 +2226,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
SrsLiveConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
@ -2265,7 +2265,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
return err;
}
srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video)
{
srs_error_t err = srs_success;
@ -2322,7 +2322,7 @@ srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
return err;
}
srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
@ -2356,7 +2356,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
SrsLiveConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume video");
}
@ -2386,7 +2386,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
return err;
}
srs_error_t SrsSource::on_aggregate(SrsCommonMessage* msg)
srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
@ -2476,7 +2476,7 @@ srs_error_t SrsSource::on_aggregate(SrsCommonMessage* msg)
return err;
}
srs_error_t SrsSource::on_publish()
srs_error_t SrsLiveSource::on_publish()
{
srs_error_t err = srs_success;
@ -2523,7 +2523,7 @@ srs_error_t SrsSource::on_publish()
return err;
}
void SrsSource::on_unpublish()
void SrsLiveSource::on_unpublish()
{
// ignore when already unpublished.
if (_can_publish) {
@ -2569,11 +2569,11 @@ void SrsSource::on_unpublish()
}
}
srs_error_t SrsSource::create_consumer(SrsConsumer*& consumer)
srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsConsumer(this);
consumer = new SrsLiveConsumer(this);
consumers.push_back(consumer);
// for edge, when play edge stream, check the state
@ -2587,7 +2587,7 @@ srs_error_t SrsSource::create_consumer(SrsConsumer*& consumer)
return err;
}
srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg)
srs_error_t SrsLiveSource::consumer_dumps(SrsLiveConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
@ -2630,9 +2630,9 @@ srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, b
return err;
}
void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer)
{
std::vector<SrsConsumer*>::iterator it;
std::vector<SrsLiveConsumer*>::iterator it;
it = std::find(consumers.begin(), consumers.end(), consumer);
if (it != consumers.end()) {
consumers.erase(it);
@ -2644,33 +2644,33 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
}
}
void SrsSource::set_cache(bool enabled)
void SrsLiveSource::set_cache(bool enabled)
{
gop_cache->set(enabled);
}
SrsRtmpJitterAlgorithm SrsSource::jitter()
SrsRtmpJitterAlgorithm SrsLiveSource::jitter()
{
return jitter_algorithm;
}
srs_error_t SrsSource::on_edge_start_publish()
srs_error_t SrsLiveSource::on_edge_start_publish()
{
return publish_edge->on_client_publish();
}
// TODO: FIXME: Use edge strategy pattern.
srs_error_t SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg)
srs_error_t SrsLiveSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{
return publish_edge->on_proxy_publish(msg);
}
void SrsSource::on_edge_proxy_unpublish()
void SrsLiveSource::on_edge_proxy_unpublish()
{
publish_edge->on_proxy_unpublish();
}
string SrsSource::get_curr_origin()
string SrsLiveSource::get_curr_origin()
{
return play_edge->get_curr_origin();
}

View file

@ -38,10 +38,10 @@
class SrsFormat;
class SrsRtmpFormat;
class SrsConsumer;
class SrsLiveConsumer;
class SrsPlayEdge;
class SrsPublishEdge;
class SrsSource;
class SrsLiveSource;
class SrsCommonMessage;
class SrsOnMetaDataPacket;
class SrsSharedPtrMessage;
@ -157,8 +157,8 @@ public:
// @max_count the max count to dequeue, must be positive.
virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
// Dumps packets to consumer, use specified args.
// @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue().
virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag);
// @remark the atc/tba/tbv/ag are same to SrsLiveConsumer.enqueue().
virtual srs_error_t dump_packets(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag);
private:
// Remove a gop from the front.
// if no iframe found, clear it.
@ -182,12 +182,12 @@ public:
virtual void wakeup() = 0;
};
// The consumer for SrsSource, that is a play client.
class SrsConsumer : public ISrsWakable
// The consumer for SrsLiveSource, that is a play client.
class SrsLiveConsumer : public ISrsWakable
{
private:
SrsRtmpJitter* jitter;
SrsSource* source;
SrsLiveSource* source;
SrsMessageQueue* queue;
bool paused;
// when source id changed, notice all consumers
@ -201,8 +201,8 @@ private:
srs_utime_t mw_duration;
#endif
public:
SrsConsumer(SrsSource* s);
virtual ~SrsConsumer();
SrsLiveConsumer(SrsLiveSource* s);
virtual ~SrsLiveConsumer();
public:
// Set the size of queue.
virtual void set_queue_size(srs_utime_t queue_size);
@ -279,7 +279,7 @@ public:
// clear the gop cache.
virtual void clear();
// dump the cached gop to consumer.
virtual srs_error_t dump(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm);
virtual srs_error_t dump(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm);
// used for atc to get the time of gop cache,
// The atc will adjust the sequence header timestamp to gop cache.
virtual bool empty();
@ -294,16 +294,16 @@ public:
// The handler to handle the event of srs source.
// For example, the http flv streaming module handle the event and
// mount http when rtmp start publishing.
class ISrsSourceHandler
class ISrsLiveSourceHandler
{
public:
ISrsSourceHandler();
virtual ~ISrsSourceHandler();
ISrsLiveSourceHandler();
virtual ~ISrsLiveSourceHandler();
public:
// when stream start publish, mount stream.
virtual srs_error_t on_publish(SrsSource* s, SrsRequest* r) = 0;
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r) = 0;
// when stream stop publish, unmount stream.
virtual void on_unpublish(SrsSource* s, SrsRequest* r) = 0;
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r) = 0;
};
// The mix queue to correct the timestamp for mix_correct algorithm.
@ -328,7 +328,7 @@ public:
class SrsOriginHub : public ISrsReloadHandler
{
private:
SrsSource* source;
SrsLiveSource* source;
SrsRequest* req;
bool is_active;
private:
@ -356,7 +356,7 @@ public:
public:
// Initialize the hub with source and request.
// @param r The request object, managed by source.
virtual srs_error_t initialize(SrsSource* s, SrsRequest* r);
virtual srs_error_t initialize(SrsLiveSource* s, SrsRequest* r);
// Dispose the hub, release utilities resource,
// For example, delete all HLS pieces.
virtual void dispose();
@ -433,7 +433,7 @@ public:
// Dumps cached metadata to consumer.
// @param dm Whether dumps the metadata.
// @param ds Whether dumps the sequence header.
virtual srs_error_t dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
virtual srs_error_t dumps(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
public:
// Previous exists sequence header.
virtual SrsSharedPtrMessage* previous_vsh();
@ -451,26 +451,26 @@ public:
};
// The source manager to create and refresh all stream sources.
class SrsSourceManager : public ISrsHourGlass
class SrsLiveSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map<std::string, SrsSource*> pool;
std::map<std::string, SrsLiveSource*> pool;
SrsHourGlass* timer_;
public:
SrsSourceManager();
virtual ~SrsSourceManager();
SrsLiveSourceManager();
virtual ~SrsLiveSourceManager();
public:
virtual srs_error_t initialize();
// create source when fetch from cache failed.
// @param r the client request.
// @param h the event handler for source.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsLiveSourceHandler* h, SrsLiveSource** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
virtual SrsSource* fetch(SrsRequest* r);
virtual SrsLiveSource* fetch(SrsRequest* r);
public:
// dispose and cycle all sources.
virtual void dispose();
@ -485,14 +485,14 @@ public:
};
// Global singleton instance.
extern SrsSourceManager* _srs_sources;
extern SrsLiveSourceManager* _srs_sources;
// For RTMP2RTC, bridge SrsSource to SrsRtcStream
class ISrsSourceBridger
// For RTMP2RTC, bridge SrsLiveSource to SrsRtcSource
class ISrsLiveSourceBridger
{
public:
ISrsSourceBridger();
virtual ~ISrsSourceBridger();
ISrsLiveSourceBridger();
virtual ~ISrsLiveSourceBridger();
public:
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_audio(SrsSharedPtrMessage* audio) = 0;
@ -501,8 +501,7 @@ public:
};
// The live streaming source.
// TODO: FIXME: Rename to SrsLiveStream.
class SrsSource : public ISrsReloadHandler
class SrsLiveSource : public ISrsReloadHandler
{
friend class SrsOriginHub;
private:
@ -516,7 +515,7 @@ private:
// deep copy of client request.
SrsRequest* req;
// To delivery stream to clients.
std::vector<SrsConsumer*> consumers;
std::vector<SrsLiveConsumer*> consumers;
// The time jitter algorithm for vhost.
SrsRtmpJitterAlgorithm jitter_algorithm;
// For play, whether use interlaced/mixed algorithm to correct timestamp.
@ -533,9 +532,9 @@ private:
// The time of the packet we just got.
int64_t last_packet_time;
// The event handler.
ISrsSourceHandler* handler;
ISrsLiveSourceHandler* handler;
// The source bridger for other source.
ISrsSourceBridger* bridger_;
ISrsLiveSourceBridger* bridger_;
// The edge control service
SrsPlayEdge* play_edge;
SrsPublishEdge* publish_edge;
@ -552,8 +551,8 @@ private:
// We will remove the source when source die.
srs_utime_t die_at;
public:
SrsSource();
virtual ~SrsSource();
SrsLiveSource();
virtual ~SrsLiveSource();
public:
virtual void dispose();
virtual srs_error_t cycle();
@ -561,9 +560,9 @@ public:
virtual bool expired();
public:
// Initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h);
// Bridge to other source, forward packets to it.
void set_bridger(ISrsSourceBridger* v);
void set_bridger(ISrsLiveSourceBridger* v);
// Interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
@ -601,13 +600,13 @@ public:
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsConsumer*& consumer);
virtual srs_error_t create_consumer(SrsLiveConsumer*& consumer);
// Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata.
// @param dg, whether dumps the gop cache.
virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual srs_error_t consumer_dumps(SrsLiveConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
virtual void on_consumer_destroy(SrsLiveConsumer* consumer);
virtual void set_cache(bool enabled);
virtual SrsRtmpJitterAlgorithm jitter();
public:

View file

@ -291,8 +291,8 @@ srs_error_t srs_thread_initialize()
// The global objects which depends on ST.
_srs_hybrid = new SrsHybridServer();
_srs_rtc_sources = new SrsRtcStreamManager();
_srs_sources = new SrsSourceManager();
_srs_rtc_sources = new SrsRtcSourceManager();
_srs_sources = new SrsLiveSourceManager();
_srs_stages = new SrsStageManager();
_srs_blackhole = new SrsRtcBlackhole();
_srs_rtc_manager = new SrsResourceManager("RTC", true);

View file

@ -26,6 +26,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 111
#define VERSION_REVISION 115
#endif