1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Support RTMP ATC for HLS/HDS to support backup(failover). change to 0.9.35

This commit is contained in:
winlin 2014-03-26 16:25:02 +08:00
parent ebf1139582
commit 0858bd8b6f
8 changed files with 120 additions and 13 deletions

View file

@ -1340,6 +1340,22 @@ bool SrsConfig::get_gop_cache(string vhost)
return true;
}
bool SrsConfig::get_atc(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return true;
}
conf = conf->get("atc");
if (conf && conf->arg0() == "on") {
return true;
}
return false;
}
double SrsConfig::get_queue_length(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

View file

@ -153,6 +153,7 @@ public:
virtual bool get_deamon();
virtual int get_max_connections();
virtual bool get_gop_cache(std::string vhost);
virtual bool get_atc(std::string vhost);
virtual double get_queue_length(std::string vhost);
virtual SrsConfDirective* get_forward(std::string vhost);
private:

View file

@ -268,10 +268,11 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
/* TODO: to support atc */
if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
if (!source->is_atc()) {
if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
}
}
if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
@ -391,6 +392,23 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
return ret;
}
bool SrsGopCache::empty()
{
return gop_cache.empty();
}
int64_t SrsGopCache::get_start_time()
{
if (empty()) {
return 0;
}
SrsSharedPtrMessage* msg = gop_cache[0];
srs_assert(msg);
return msg->header.timestamp;
}
std::map<std::string, SrsSource*> SrsSource::pool;
SrsSource* SrsSource::find(SrsRequest* req)
@ -425,6 +443,7 @@ SrsSource::SrsSource(SrsRequest* _req)
gop_cache = new SrsGopCache();
_srs_config->subscribe(this);
atc = _srs_config->get_atc(req->vhost);
}
SrsSource::~SrsSource()
@ -774,6 +793,16 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
}
srs_verbose("cache gop success.");
// if atc, update the sequence header to abs time.
if (atc) {
if (cache_sh_audio) {
cache_sh_audio->header.timestamp = msg->header.timestamp;
}
if (cache_metadata) {
cache_metadata->header.timestamp = msg->header.timestamp;
}
}
return ret;
}
@ -841,6 +870,16 @@ int SrsSource::on_video(SrsCommonMessage* video)
}
srs_verbose("cache gop success.");
// if atc, update the sequence header to abs time.
if (atc) {
if (cache_sh_video) {
cache_sh_video->header.timestamp = msg->header.timestamp;
}
if (cache_metadata) {
cache_metadata->header.timestamp = msg->header.timestamp;
}
}
return ret;
}
@ -921,6 +960,17 @@ void SrsSource::on_unpublish()
}
srs_info("dispatch metadata success");
// if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) {
if (cache_sh_video) {
cache_sh_video->header.timestamp = gop_cache->get_start_time();
}
if (cache_sh_audio) {
cache_sh_audio->header.timestamp = gop_cache->get_start_time();
}
}
// copy sequence header
if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret);
return ret;
@ -933,6 +983,7 @@ void SrsSource::on_unpublish()
}
srs_info("dispatch audio sequence header success");
// copy gop cache to client.
if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) {
return ret;
}
@ -957,6 +1008,11 @@ void SrsSource::set_cache(bool enabled)
gop_cache->set(enabled);
}
bool SrsSource::is_atc()
{
return atc;
}
int SrsSource::create_forwarders()
{
int ret = ERROR_SUCCESS;

View file

@ -191,6 +191,12 @@ public:
virtual int cache(SrsSharedPtrMessage* msg);
virtual void clear();
virtual int dump(SrsConsumer* consumer, int tba, int tbv);
/**
* used for atc to get the time of gop cache,
* the atc will adjust the sequence header timestamp to gop cache.
*/
virtual bool empty();
virtual int64_t get_start_time();
};
/**
@ -238,6 +244,13 @@ private:
* can publish, true when is not streaming
*/
bool _can_publish;
/**
* atc whether atc(use absolute time and donot adjust time),
* directly use msg time and donot adjust if atc is true,
* otherwise, adjust msg time to start from 0 to make flash happy.
*/
// TODO: FIXME: to support reload atc.
bool atc;
private:
SrsSharedPtrMessage* cache_metadata;
// the cached video sequence header.
@ -279,6 +292,10 @@ public:
virtual int create_consumer(SrsConsumer*& consumer);
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual void set_cache(bool enabled);
// internal
public:
// for consumer, atc feature.
virtual bool is_atc();
private:
virtual int create_forwarders();
virtual void destroy_forwarders();