1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00
srs/trunk/src/app/srs_app_source.cpp

2696 lines
71 KiB
C++
Raw Normal View History

2017-03-25 09:21:39 +00:00
/**
* The MIT License (MIT)
*
2019-12-30 02:10:35 +00:00
* Copyright (c) 2013-2020 Winlin
2017-03-25 09:21:39 +00:00
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_source.hpp>
2014-05-29 06:16:34 +00:00
#include <sstream>
#include <algorithm>
2013-12-15 04:04:28 +00:00
using namespace std;
#include <srs_kernel_log.hpp>
2015-01-23 02:07:20 +00:00
#include <srs_rtmp_stack.hpp>
2015-09-22 01:05:21 +00:00
#include <srs_protocol_amf0.hpp>
2014-05-28 09:37:15 +00:00
#include <srs_kernel_codec.hpp>
2020-05-11 04:07:55 +00:00
#include <srs_kernel_rtc_rtp.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_forward.hpp>
#include <srs_app_config.hpp>
#include <srs_app_encoder.hpp>
2015-06-13 08:04:59 +00:00
#include <srs_rtmp_stack.hpp>
2014-04-16 01:28:02 +00:00
#include <srs_app_dvr.hpp>
2015-09-22 00:48:55 +00:00
#include <srs_kernel_buffer.hpp>
#include <srs_app_edge.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_codec.hpp>
2015-01-23 02:07:20 +00:00
#include <srs_rtmp_msg_array.hpp>
2015-03-11 05:34:58 +00:00
#include <srs_app_hds.hpp>
2015-03-08 07:33:08 +00:00
#include <srs_app_statistic.hpp>
#include <srs_core_autofree.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_ng_exec.hpp>
2017-02-11 13:14:28 +00:00
#include <srs_app_dash.hpp>
#include <srs_protocol_format.hpp>
2020-05-12 11:53:21 +00:00
#include <srs_app_rtc_source.hpp>
#define CONST_MAX_JITTER_MS 250
#define CONST_MAX_JITTER_MS_NEG -250
#define DEFAULT_FRAME_TIME_MS 10
// for 26ms per audio packet,
// 115 packets is 3s.
#define SRS_PURE_AUDIO_GUESS_COUNT 115
2015-07-14 03:28:00 +00:00
// when got these videos or audios, pure audio or video, mix ok.
#define SRS_MIX_CORRECT_PURE_AV 10
// the time to cleanup source.
#define SRS_SOURCE_CLEANUP (30 * SRS_UTIME_SECONDS)
2016-09-05 06:13:37 +00:00
int _srs_time_jitter_string2int(std::string time_jitter)
{
if (time_jitter == "full") {
return SrsRtmpJitterAlgorithmFULL;
} else if (time_jitter == "zero") {
return SrsRtmpJitterAlgorithmZERO;
} else {
return SrsRtmpJitterAlgorithmOFF;
}
}
SrsRtmpJitter::SrsRtmpJitter()
{
last_pkt_correct_time = -1;
last_pkt_time = 0;
}
SrsRtmpJitter::~SrsRtmpJitter()
{
}
srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
// for performance issue
if (ag != SrsRtmpJitterAlgorithmFULL) {
// all jitter correct features is disabled, ignore.
if (ag == SrsRtmpJitterAlgorithmOFF) {
return err;
}
2017-03-25 09:21:39 +00:00
// start at zero, but donot ensure monotonically increasing.
if (ag == SrsRtmpJitterAlgorithmZERO) {
// for the first time, last_pkt_correct_time is -1.
if (last_pkt_correct_time == -1) {
last_pkt_correct_time = msg->timestamp;
}
msg->timestamp -= last_pkt_correct_time;
return err;
}
// other algorithm, ignore.
return err;
}
// full jitter algorithm, do jitter correct.
2014-03-18 03:32:58 +00:00
// set to 0 for metadata.
if (!msg->is_av()) {
msg->timestamp = 0;
return err;
2014-03-18 03:32:58 +00:00
}
/**
2017-03-25 09:21:39 +00:00
* we use a very simple time jitter detect/correct algorithm:
* 1. delta: ensure the delta is positive and valid,
* we set the delta to DEFAULT_FRAME_TIME_MS,
* if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
* 2. last_pkt_time: specifies the original packet time,
* is used to detect next jitter.
* 3. last_pkt_correct_time: simply add the positive delta,
* and enforce the time monotonically.
*/
int64_t time = msg->timestamp;
2014-03-18 03:32:58 +00:00
int64_t delta = time - last_pkt_time;
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
// if jitter detected, reset the delta.
if (delta < CONST_MAX_JITTER_MS_NEG || delta > CONST_MAX_JITTER_MS) {
// use default 10ms to notice the problem of stream.
2015-11-11 02:37:50 +00:00
// @see https://github.com/ossrs/srs/issues/425
delta = DEFAULT_FRAME_TIME_MS;
2014-03-18 03:32:58 +00:00
}
last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
msg->timestamp = last_pkt_correct_time;
2014-03-18 03:32:58 +00:00
last_pkt_time = time;
return err;
}
2017-09-23 14:12:33 +00:00
int64_t SrsRtmpJitter::get_time()
{
2017-09-23 14:12:33 +00:00
return last_pkt_correct_time;
}
#ifdef SRS_PERF_QUEUE_FAST_VECTOR
SrsFastVector::SrsFastVector()
{
count = 0;
nb_msgs = 8;
msgs = new SrsSharedPtrMessage*[nb_msgs];
}
SrsFastVector::~SrsFastVector()
{
free();
2015-11-02 03:05:39 +00:00
srs_freepa(msgs);
}
int SrsFastVector::size()
{
return count;
}
int SrsFastVector::begin()
{
return 0;
}
int SrsFastVector::end()
{
return count;
}
SrsSharedPtrMessage** SrsFastVector::data()
{
return msgs;
}
SrsSharedPtrMessage* SrsFastVector::at(int index)
{
srs_assert(index < count);
return msgs[index];
}
void SrsFastVector::clear()
{
count = 0;
}
void SrsFastVector::erase(int _begin, int _end)
{
srs_assert(_begin < _end);
// move all erased to previous.
for (int i = 0; i < count - _end; i++) {
msgs[_begin + i] = msgs[_end + i];
}
// update the count.
count -= _end - _begin;
}
void SrsFastVector::push_back(SrsSharedPtrMessage* msg)
{
// increase vector.
if (count >= nb_msgs) {
int size = srs_max(SRS_PERF_MW_MSGS * 8, nb_msgs * 2);
SrsSharedPtrMessage** buf = new SrsSharedPtrMessage*[size];
for (int i = 0; i < nb_msgs; i++) {
buf[i] = msgs[i];
}
srs_info("fast vector incrase %d=>%d", nb_msgs, size);
// use new array.
2015-11-02 03:05:39 +00:00
srs_freepa(msgs);
msgs = buf;
nb_msgs = size;
}
msgs[count++] = msg;
}
void SrsFastVector::free()
{
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
srs_freep(msg);
}
count = 0;
}
#endif
SrsMessageQueue::SrsMessageQueue(bool ignore_shrink)
{
_ignore_shrink = ignore_shrink;
max_queue_size = 0;
2014-03-18 03:32:58 +00:00
av_start_time = av_end_time = -1;
}
2013-12-15 10:25:55 +00:00
SrsMessageQueue::~SrsMessageQueue()
{
2014-03-18 03:32:58 +00:00
clear();
}
int SrsMessageQueue::size()
{
return (int)msgs.size();
}
srs_utime_t SrsMessageQueue::duration()
{
return (av_end_time - av_start_time);
}
void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
{
max_queue_size = queue_size;
}
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2020-04-18 12:37:08 +00:00
msgs.push_back(msg);
2014-03-18 03:32:58 +00:00
if (msg->is_av()) {
2014-03-18 03:32:58 +00:00
if (av_start_time == -1) {
av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
2014-03-18 03:32:58 +00:00
}
2019-04-12 02:00:39 +00:00
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
2014-03-18 03:32:58 +00:00
}
if (max_queue_size <= 0) {
return err;
}
2014-03-18 03:32:58 +00:00
while (av_end_time - av_start_time > max_queue_size) {
// notice the caller queue already overflow and shrinked.
if (is_overflow) {
*is_overflow = true;
}
2014-03-18 03:32:58 +00:00
shrink();
}
2017-09-23 14:12:33 +00:00
return err;
}
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-12-05 14:00:57 +00:00
int nb_msgs = (int)msgs.size();
if (nb_msgs <= 0) {
2017-09-23 14:12:33 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
srs_assert(max_count > 0);
count = srs_min(max_count, nb_msgs);
2017-03-25 09:21:39 +00:00
2014-12-05 14:00:57 +00:00
SrsSharedPtrMessage** omsgs = msgs.data();
2020-04-18 12:37:08 +00:00
memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
if (count >= nb_msgs) {
// the pmsgs is big enough and clear msgs at most time.
2014-03-18 03:32:58 +00:00
msgs.clear();
} else {
// erase some vector elements may cause memory copy,
// maybe can use more efficient vector.swap to avoid copy.
// @remark for the pmsgs is big enough, for instance, SRS_PERF_MW_MSGS 128,
// the rtmp play client will get 128msgs once, so this branch rarely execute.
2014-03-18 03:32:58 +00:00
msgs.erase(msgs.begin(), msgs.begin() + count);
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
int nb_msgs = (int)msgs.size();
if (nb_msgs <= 0) {
2017-09-23 14:12:33 +00:00
return err;
}
2017-03-25 09:21:39 +00:00
SrsSharedPtrMessage** omsgs = msgs.data();
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = omsgs[i];
2017-09-23 14:12:33 +00:00
if ((err = consumer->enqueue(msg, atc, ag)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
2017-09-23 14:12:33 +00:00
return err;
}
2013-12-15 10:25:55 +00:00
void SrsMessageQueue::shrink()
{
SrsSharedPtrMessage* video_sh = NULL;
SrsSharedPtrMessage* audio_sh = NULL;
int msgs_size = (int)msgs.size();
2014-03-18 03:32:58 +00:00
// remove all msg
// igone the sequence header
for (int i = 0; i < (int)msgs.size(); i++) {
SrsSharedPtrMessage* msg = msgs.at(i);
2017-03-25 09:21:39 +00:00
2017-02-12 12:38:39 +00:00
if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) {
srs_freep(video_sh);
video_sh = msg;
continue;
2014-03-18 03:32:58 +00:00
}
2017-02-12 12:38:39 +00:00
else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload, msg->size)) {
srs_freep(audio_sh);
audio_sh = msg;
continue;
}
2017-03-25 09:21:39 +00:00
srs_freep(msg);
2014-03-18 03:32:58 +00:00
}
2017-03-25 09:21:39 +00:00
msgs.clear();
// update av_start_time
av_start_time = av_end_time;
//push_back secquence header and update timestamp
if (video_sh) {
2019-04-12 02:00:39 +00:00
video_sh->timestamp = srsu2ms(av_end_time);
msgs.push_back(video_sh);
}
if (audio_sh) {
2019-04-12 02:00:39 +00:00
audio_sh->timestamp = srsu2ms(av_end_time);
msgs.push_back(audio_sh);
2014-03-18 03:32:58 +00:00
}
2018-01-01 11:39:57 +00:00
if (!_ignore_shrink) {
srs_trace("shrinking, size=%d, removed=%d, max=%dms", (int)msgs.size(), msgs_size - (int)msgs.size(), srsu2msi(max_queue_size));
2014-03-18 03:32:58 +00:00
}
}
2013-12-15 10:25:55 +00:00
void SrsMessageQueue::clear()
{
#ifndef SRS_PERF_QUEUE_FAST_VECTOR
std::vector<SrsSharedPtrMessage*>::iterator it;
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
2014-03-18 03:32:58 +00:00
srs_freep(msg);
}
#else
msgs.free();
#endif
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
msgs.clear();
av_start_time = av_end_time = -1;
2013-12-15 10:25:55 +00:00
}
ISrsWakable::ISrsWakable()
{
}
ISrsWakable::~ISrsWakable()
{
}
2020-05-12 05:19:31 +00:00
ISrsConsumerQueue::ISrsConsumerQueue()
{
}
ISrsConsumerQueue::~ISrsConsumerQueue()
{
}
SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
2013-12-15 10:25:55 +00:00
{
source = s;
conn = c;
2014-03-18 03:32:58 +00:00
paused = false;
jitter = new SrsRtmpJitter();
queue = new SrsMessageQueue();
should_update_source_id = false;
#ifdef SRS_PERF_QUEUE_COND_WAIT
mw_wait = srs_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
#endif
2013-12-15 10:25:55 +00:00
}
SrsConsumer::~SrsConsumer()
{
2014-03-18 03:32:58 +00:00
source->on_consumer_destroy(this);
srs_freep(jitter);
srs_freep(queue);
#ifdef SRS_PERF_QUEUE_COND_WAIT
srs_cond_destroy(mw_wait);
#endif
2013-12-15 10:25:55 +00:00
}
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
2013-12-15 10:25:55 +00:00
{
2014-03-18 03:32:58 +00:00
queue->set_queue_size(queue_size);
2013-12-15 10:25:55 +00:00
}
void SrsConsumer::update_source_id()
{
should_update_source_id = true;
}
2017-09-23 14:12:33 +00:00
int64_t SrsConsumer::get_time()
2013-12-15 10:25:55 +00:00
{
2014-03-18 03:32:58 +00:00
return jitter->get_time();
2013-12-15 10:25:55 +00:00
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
2013-12-15 10:25:55 +00:00
{
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
SrsSharedPtrMessage* msg = shared_msg->copy();
2020-04-18 12:37:08 +00:00
if (!atc) {
if ((err = jitter->correct(msg, ag)) != srs_success) {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "consume message");
}
2014-03-18 03:32:58 +00:00
}
2020-04-18 12:37:08 +00:00
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "enqueue message");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
2020-04-18 12:37:08 +00:00
// For RTMP, we wait for messages and duration.
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// For ATC, maybe the SH timestamp bigger than A/V packet,
// when encoder republish or overflow.
// @see https://github.com/ossrs/srs/pull/749
if (atc && duration < 0) {
srs_cond_signal(mw_wait);
mw_waiting = false;
2017-09-23 14:12:33 +00:00
return err;
}
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration) {
srs_cond_signal(mw_wait);
mw_waiting = false;
2017-09-23 14:12:33 +00:00
return err;
}
}
#endif
2017-09-23 14:12:33 +00:00
return err;
2013-12-15 10:25:55 +00:00
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
2013-12-15 10:25:55 +00:00
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
srs_assert(count >= 0);
srs_assert(msgs->max > 0);
// the count used as input to reset the max if positive.
int max = count? srs_min(count, msgs->max) : msgs->max;
// the count specifies the max acceptable count,
// here maybe 1+, and we must set to 0 when got nothing.
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
should_update_source_id = false;
}
2014-03-18 03:32:58 +00:00
// paused, return nothing.
if (paused) {
2017-09-23 14:12:33 +00:00
return err;
}
2017-03-25 09:21:39 +00:00
// pump msgs from queue.
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "dump packets");
}
2017-09-23 14:12:33 +00:00
return err;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
{
if (paused) {
srs_usleep(SRS_CONSTS_RTMP_PULSE);
return;
}
2017-03-25 09:21:39 +00:00
mw_min_msgs = nb_msgs;
mw_duration = msgs_duration;
2017-03-25 09:21:39 +00:00
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration) {
return;
}
// the enqueue will notify this cond.
mw_waiting = true;
// use cond block wait for high performance mode.
srs_cond_wait(mw_wait);
}
#endif
2017-09-23 14:12:33 +00:00
srs_error_t SrsConsumer::on_play_client_pause(bool is_pause)
2013-12-15 10:25:55 +00:00
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);
paused = is_pause;
2017-09-23 14:12:33 +00:00
return err;
}
void SrsConsumer::wakeup()
{
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (mw_waiting) {
srs_cond_signal(mw_wait);
mw_waiting = false;
}
#endif
}
SrsGopCache::SrsGopCache()
{
2014-03-18 03:32:58 +00:00
cached_video_count = 0;
enable_gop_cache = true;
audio_after_last_video_count = 0;
}
SrsGopCache::~SrsGopCache()
{
2014-03-18 03:32:58 +00:00
clear();
}
2015-06-07 01:27:47 +00:00
void SrsGopCache::dispose()
{
clear();
}
void SrsGopCache::set(bool v)
{
enable_gop_cache = v;
2014-03-18 03:32:58 +00:00
if (!v) {
2014-03-18 03:32:58 +00:00
clear();
return;
}
}
bool SrsGopCache::enabled()
{
return enable_gop_cache;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsGopCache::cache(SrsSharedPtrMessage* shared_msg)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
if (!enable_gop_cache) {
2017-09-23 14:12:33 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
2017-03-25 09:21:39 +00:00
// the gop cache know when to gop it.
SrsSharedPtrMessage* msg = shared_msg;
2014-03-18 03:32:58 +00:00
// got video, update the video count if acceptable
if (msg->is_video()) {
// drop video when not h.264
2017-02-12 12:38:39 +00:00
if (!SrsFlvVideo::h264(msg->payload, msg->size)) {
2017-09-23 14:12:33 +00:00
return err;
}
2014-03-18 03:32:58 +00:00
cached_video_count++;
audio_after_last_video_count = 0;
2014-03-18 03:32:58 +00:00
}
// no acceptable video or pure audio, disable the cache.
if (pure_audio()) {
2017-09-23 14:12:33 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
// ok, gop cache enabled, and got an audio.
if (msg->is_audio()) {
audio_after_last_video_count++;
}
// clear gop cache when pure audio count overflow
if (audio_after_last_video_count > SRS_PURE_AUDIO_GUESS_COUNT) {
srs_warn("clear gop cache for guess pure audio overflow");
clear();
2017-09-23 14:12:33 +00:00
return err;
}
2014-03-18 03:32:58 +00:00
// clear gop cache when got key frame
2017-02-12 12:38:39 +00:00
if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) {
2014-03-18 03:32:58 +00:00
clear();
// curent msg is video frame, so we set to 1.
cached_video_count = 1;
}
// cache the frame.
gop_cache.push_back(msg->copy());
2017-09-23 14:12:33 +00:00
return err;
}
void SrsGopCache::clear()
{
std::vector<SrsSharedPtrMessage*>::iterator it;
2014-03-18 03:32:58 +00:00
for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
2014-03-18 03:32:58 +00:00
srs_freep(msg);
}
gop_cache.clear();
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
cached_video_count = 0;
audio_after_last_video_count = 0;
}
2017-03-25 09:21:39 +00:00
2017-09-23 14:12:33 +00:00
srs_error_t SrsGopCache::dump(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
std::vector<SrsSharedPtrMessage*>::iterator it;
2014-03-18 03:32:58 +00:00
for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
2017-09-23 14:12:33 +00:00
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "enqueue message");
2014-03-18 03:32:58 +00:00
}
}
srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time());
2017-09-23 14:12:33 +00:00
return err;
}
bool SrsGopCache::empty()
{
return gop_cache.empty();
}
srs_utime_t SrsGopCache::start_time()
{
if (empty()) {
return 0;
}
SrsSharedPtrMessage* msg = gop_cache[0];
srs_assert(msg);
return srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
bool SrsGopCache::pure_audio()
{
return cached_video_count == 0;
}
ISrsSourceHandler::ISrsSourceHandler()
{
}
ISrsSourceHandler::~ISrsSourceHandler()
{
}
// TODO: FIXME: Remove it?
bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* msg)
{
// only continue for decode error.
if (ret != ERROR_HLS_DECODE_ERROR) {
return false;
}
// when video size equals to sequence header,
// the video actually maybe a sequence header,
// continue to make ffmpeg happy.
if (sh && sh->size == msg->size) {
srs_warn("the msg is actually a sequence header, ignore this packet.");
return true;
}
return false;
}
2017-01-22 09:07:55 +00:00
SrsMixQueue::SrsMixQueue()
{
nb_videos = 0;
nb_audios = 0;
}
SrsMixQueue::~SrsMixQueue()
{
clear();
}
void SrsMixQueue::clear()
{
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = it->second;
srs_freep(msg);
}
msgs.clear();
nb_videos = 0;
nb_audios = 0;
}
void SrsMixQueue::push(SrsSharedPtrMessage* msg)
{
msgs.insert(std::make_pair(msg->timestamp, msg));
if (msg->is_video()) {
nb_videos++;
} else {
nb_audios++;
}
}
SrsSharedPtrMessage* SrsMixQueue::pop()
{
bool mix_ok = false;
// pure video
if (nb_videos >= SRS_MIX_CORRECT_PURE_AV && nb_audios == 0) {
mix_ok = true;
}
// pure audio
if (nb_audios >= SRS_MIX_CORRECT_PURE_AV && nb_videos == 0) {
mix_ok = true;
}
// got 1 video and 1 audio, mix ok.
if (nb_videos >= 1 && nb_audios >= 1) {
mix_ok = true;
}
if (!mix_ok) {
return NULL;
}
// pop the first msg.
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it = msgs.begin();
SrsSharedPtrMessage* msg = it->second;
msgs.erase(it);
if (msg->is_video()) {
nb_videos--;
} else {
nb_audios--;
}
return msg;
}
2017-02-11 13:14:28 +00:00
SrsOriginHub::SrsOriginHub()
{
2017-02-11 13:14:28 +00:00
source = NULL;
req = NULL;
2017-02-09 06:33:56 +00:00
is_active = false;
2014-03-18 03:32:58 +00:00
hls = new SrsHls();
2017-02-19 14:03:51 +00:00
dash = new SrsDash();
dvr = new SrsDvr();
2014-03-18 03:32:58 +00:00
encoder = new SrsEncoder();
#ifdef SRS_HDS
2017-02-11 13:14:28 +00:00
hds = new SrsHds();
2015-03-12 03:15:15 +00:00
#endif
ng_exec = new SrsNgExec();
format = new SrsRtmpFormat();
2014-03-18 03:32:58 +00:00
_srs_config->subscribe(this);
}
SrsOriginHub::~SrsOriginHub()
{
2014-03-18 03:32:58 +00:00
_srs_config->unsubscribe(this);
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
srs_freep(forwarder);
}
forwarders.clear();
}
srs_freep(ng_exec);
2014-03-18 03:32:58 +00:00
srs_freep(format);
2014-03-18 03:32:58 +00:00
srs_freep(hls);
2017-02-11 13:14:28 +00:00
srs_freep(dash);
2014-04-16 01:28:02 +00:00
srs_freep(dvr);
2014-03-18 03:32:58 +00:00
srs_freep(encoder);
#ifdef SRS_HDS
2015-03-12 03:15:15 +00:00
srs_freep(hds);
#endif
}
2017-06-11 01:40:07 +00:00
srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r)
{
2017-06-11 01:40:07 +00:00
srs_error_t err = srs_success;
req = r;
2017-02-11 13:14:28 +00:00
source = s;
2017-06-11 01:40:07 +00:00
if ((err = format->initialize()) != srs_success) {
return srs_error_wrap(err, "format initialize");
}
2020-03-08 11:20:46 +00:00
2017-06-11 01:40:07 +00:00
if ((err = hls->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "hls initialize");
}
2017-06-11 01:40:07 +00:00
if ((err = dash->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "dash initialize");
2017-02-11 13:14:28 +00:00
}
2017-06-11 01:40:07 +00:00
if ((err = dvr->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "dvr initialize");
2016-09-05 06:56:31 +00:00
}
2017-06-11 01:40:07 +00:00
return err;
}
void SrsOriginHub::dispose()
{
hls->dispose();
2017-02-11 13:14:28 +00:00
// TODO: Support dispose DASH.
}
2017-06-11 01:40:07 +00:00
srs_error_t SrsOriginHub::cycle()
{
2017-06-11 01:40:07 +00:00
srs_error_t err = srs_success;
2016-09-05 06:13:37 +00:00
2017-06-11 01:40:07 +00:00
if ((err = hls->cycle()) != srs_success) {
return srs_error_wrap(err, "hls cycle");
2016-09-05 06:13:37 +00:00
}
2017-02-11 13:14:28 +00:00
// TODO: Support cycle DASH.
2017-06-11 01:40:07 +00:00
return err;
2016-09-05 06:13:37 +00:00
}
bool SrsOriginHub::active()
{
return is_active;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2017-09-23 14:12:33 +00:00
if ((err = format->on_metadata(packet)) != srs_success) {
return srs_error_wrap(err, "Format parse metadata");
}
// copy to all forwarders
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
2017-09-23 14:12:33 +00:00
if ((err = forwarder->on_meta_data(shared_metadata)) != srs_success) {
return srs_error_wrap(err, "Forwarder consume metadata");
}
}
}
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_meta_data(shared_metadata)) != srs_success) {
return srs_error_wrap(err, "DVR consume metadata");
2017-02-06 12:58:52 +00:00
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* msg = shared_audio;
// TODO: FIXME: Support parsing OPUS for RTC.
2017-09-23 14:12:33 +00:00
if ((err = format->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "format consume audio");
}
// cache the sequence header if aac
// donot cache the sequence header to gop_cache, return here.
if (format->is_aac_sequence_header()) {
srs_assert(format->acodec);
2017-02-12 12:38:39 +00:00
SrsAudioCodecConfig* c = format->acodec;
static int flv_sample_sizes[] = {8, 16, 0};
static int flv_sound_types[] = {1, 2, 0};
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
2018-01-01 11:39:57 +00:00
if ((err = stat->on_audio_info(req, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) {
return srs_error_wrap(err, "stat audio");
}
srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), flv(%dbits, %dchannels, %dHZ)",
2017-03-25 09:21:39 +00:00
msg->size, c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels,
c->audio_data_rate / 1000, srs_aac_srates[c->aac_sample_rate],
flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
srs_flv_srates[c->sound_rate]);
}
if ((err = hls->on_audio(msg, format)) != srs_success) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
2017-09-23 14:12:33 +00:00
srs_warn("hls: ignore audio error %s", srs_error_desc(err).c_str());
hls->on_unpublish();
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
2017-09-23 14:12:33 +00:00
if (srs_hls_can_continue(srs_error_code(err), source->meta->ash(), msg)) {
srs_error_reset(err);
} else {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "hls: audio");
}
} else {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "hls: audio");
}
}
2018-01-01 11:39:57 +00:00
if ((err = dash->on_audio(msg, format)) != srs_success) {
srs_warn("dash: ignore audio error %s", srs_error_desc(err).c_str());
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
2018-01-01 11:39:57 +00:00
dash->on_unpublish();
2017-02-11 13:14:28 +00:00
}
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_audio(msg, format)) != srs_success) {
srs_warn("dvr: ignore audio error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
2018-01-01 11:39:57 +00:00
dvr->on_unpublish();
}
#ifdef SRS_HDS
2018-01-01 11:39:57 +00:00
if ((err = hds->on_audio(msg)) != srs_success) {
srs_warn("hds: ignore audio error %s", srs_error_desc(err).c_str());
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
2018-01-01 11:39:57 +00:00
hds->on_unpublish();
}
#endif
// copy to all forwarders.
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
2017-09-23 14:12:33 +00:00
if ((err = forwarder->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "forward: audio");
}
}
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header)
2013-12-15 04:34:22 +00:00
{
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
SrsSharedPtrMessage* msg = shared_video;
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
if (is_sequence_header) {
format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
}
2017-09-23 14:12:33 +00:00
if ((err = format->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "format consume video");
}
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
if (format->is_avc_sequence_header()) {
2017-02-12 12:38:39 +00:00
SrsVideoCodecConfig* c = format->vcodec;
srs_assert(c);
// when got video stream info.
SrsStatistic* stat = SrsStatistic::instance();
2018-01-01 11:39:57 +00:00
if ((err = stat->on_video_info(req, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) {
return srs_error_wrap(err, "stat video");
}
srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)",
2017-03-25 09:21:39 +00:00
msg->size, c->id, srs_avc_profile2str(c->avc_profile).c_str(),
srs_avc_level2str(c->avc_level).c_str(), c->width, c->height,
c->video_data_rate / 1000, c->frame_rate, c->duration);
}
// Ignore video data when no sps/pps
// @bug https://github.com/ossrs/srs/issues/703#issuecomment-578393155
if (format->vcodec && !format->vcodec->is_avc_codec_ok()) {
return err;
}
if ((err = hls->on_video(msg, format)) != srs_success) {
// TODO: We should support more strategies.
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
2017-09-23 14:12:33 +00:00
srs_warn("hls: ignore video error %s", srs_error_desc(err).c_str());
hls->on_unpublish();
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
2018-01-01 11:39:57 +00:00
if (srs_hls_can_continue(srs_error_code(err), source->meta->vsh(), msg)) {
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
} else {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "hls: video");
}
} else {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "hls: video");
}
2014-03-18 03:32:58 +00:00
}
2018-01-01 11:39:57 +00:00
if ((err = dash->on_video(msg, format)) != srs_success) {
srs_warn("dash: ignore video error %s", srs_error_desc(err).c_str());
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
2018-01-01 11:39:57 +00:00
dash->on_unpublish();
2017-02-11 13:14:28 +00:00
}
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_video(msg, format)) != srs_success) {
srs_warn("dvr: ignore video error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
2018-01-01 11:39:57 +00:00
dvr->on_unpublish();
}
#ifdef SRS_HDS
2018-01-01 11:39:57 +00:00
if ((err = hds->on_video(msg)) != srs_success) {
srs_warn("hds: ignore video error %s", srs_error_desc(err).c_str());
2017-09-23 14:12:33 +00:00
srs_error_reset(err);
2018-01-01 11:39:57 +00:00
hds->on_unpublish();
}
#endif
// copy to all forwarders.
if (!forwarders.empty()) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
2017-09-23 14:12:33 +00:00
if ((err = forwarder->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "forward video");
}
}
2014-03-18 03:32:58 +00:00
}
2017-09-23 14:12:33 +00:00
return err;
2013-12-15 04:34:22 +00:00
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::on_publish()
{
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
// create forwarders
2017-09-23 14:12:33 +00:00
if ((err = create_forwarders()) != srs_success) {
return srs_error_wrap(err, "create forwarders");
}
// TODO: FIXME: use initialize to set req.
2018-01-01 11:39:57 +00:00
if ((err = encoder->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "encoder publish");
}
if ((err = hls->on_publish()) != srs_success) {
2017-09-23 14:12:33 +00:00
return srs_error_wrap(err, "hls publish");
}
2018-01-01 11:39:57 +00:00
if ((err = dash->on_publish()) != srs_success) {
return srs_error_wrap(err, "dash publish");
2017-02-11 13:14:28 +00:00
}
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_publish()) != srs_success) {
return srs_error_wrap(err, "dvr publish");
}
// TODO: FIXME: use initialize to set req.
#ifdef SRS_HDS
2018-01-01 11:39:57 +00:00
if ((err = hds->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "hds publish");
}
#endif
// TODO: FIXME: use initialize to set req.
2018-01-01 11:39:57 +00:00
if ((err = ng_exec->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "exec publish");
}
2017-02-09 06:33:56 +00:00
is_active = true;
2017-09-23 14:12:33 +00:00
return err;
}
void SrsOriginHub::on_unpublish()
{
2017-02-09 06:33:56 +00:00
is_active = false;
// destroy all forwarders
destroy_forwarders();
encoder->on_unpublish();
hls->on_unpublish();
2017-02-11 13:14:28 +00:00
dash->on_unpublish();
dvr->on_unpublish();
#ifdef SRS_HDS
hds->on_unpublish();
#endif
ng_exec->on_unpublish();
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
SrsSharedPtrMessage* cache_metadata = source->meta->data();
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
// feed the forwarder the metadata/sequence header,
// when reload to enable the forwarder.
2017-09-23 14:12:33 +00:00
if (cache_metadata && (err = forwarder->on_meta_data(cache_metadata)) != srs_success) {
return srs_error_wrap(err, "forward metadata");
}
2017-09-23 14:12:33 +00:00
if (cache_sh_video && (err = forwarder->on_video(cache_sh_video)) != srs_success) {
return srs_error_wrap(err, "forward video sh");
}
2017-09-23 14:12:33 +00:00
if (cache_sh_audio && (err = forwarder->on_audio(cache_sh_audio)) != srs_success) {
return srs_error_wrap(err, "forward audio sh");
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::on_dvr_request_sh()
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
SrsSharedPtrMessage* cache_metadata = source->meta->data();
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
// feed the dvr the metadata/sequence header,
// when reload to start dvr, dvr will never get the sequence header in stream,
// use the SrsSource.on_dvr_request_sh to push the sequence header to DVR.
2017-09-23 14:12:33 +00:00
if (cache_metadata && (err = dvr->on_meta_data(cache_metadata)) != srs_success) {
return srs_error_wrap(err, "dvr metadata");
}
2017-06-04 07:10:35 +00:00
if (cache_sh_video) {
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_video(cache_sh_video, source->meta->vsh_format())) != srs_success) {
return srs_error_wrap(err, "dvr video");
2017-06-04 07:10:35 +00:00
}
}
2017-06-04 07:10:35 +00:00
if (cache_sh_audio) {
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_audio(cache_sh_audio, source->meta->ash_format())) != srs_success) {
return srs_error_wrap(err, "dvr audio");
2017-06-04 07:10:35 +00:00
}
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_forward(string vhost)
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
// forwarders
destroy_forwarders();
// Don't start forwarders when source is not active.
2017-02-09 06:33:56 +00:00
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
if ((err = create_forwarders()) != srs_success) {
return srs_error_wrap(err, "create forwarders");
}
srs_trace("vhost %s forwarders reload success", vhost.c_str());
2017-09-22 08:14:30 +00:00
return err;
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost)
2017-02-11 13:14:28 +00:00
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
2017-02-11 13:14:28 +00:00
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
2017-02-11 13:14:28 +00:00
}
dash->on_unpublish();
// Don't start DASH when source is not active.
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
2017-02-11 13:14:28 +00:00
}
2018-01-01 11:39:57 +00:00
if ((err = dash->on_publish()) != srs_success) {
return srs_error_wrap(err, "dash start publish");
2017-02-11 13:14:28 +00:00
}
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
if (cache_sh_video) {
2017-09-23 14:12:33 +00:00
if ((err = format->on_video(cache_sh_video)) != srs_success) {
return srs_error_wrap(err, "format on_video");
}
2018-01-01 11:39:57 +00:00
if ((err = dash->on_video(cache_sh_video, format)) != srs_success) {
return srs_error_wrap(err, "dash on_video");
}
2017-02-11 13:14:28 +00:00
}
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
if (cache_sh_audio) {
2017-09-23 14:12:33 +00:00
if ((err = format->on_audio(cache_sh_audio)) != srs_success) {
return srs_error_wrap(err, "format on_audio");
}
2018-01-01 11:39:57 +00:00
if ((err = dash->on_audio(cache_sh_audio, format)) != srs_success) {
return srs_error_wrap(err, "dash on_audio");
}
2017-02-11 13:14:28 +00:00
}
2017-09-22 08:14:30 +00:00
return err;
2017-02-11 13:14:28 +00:00
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost)
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
// TODO: FIXME: maybe should ignore when publish already stopped?
2014-03-18 03:32:58 +00:00
hls->on_unpublish();
2017-02-11 13:14:28 +00:00
// Don't start HLS when source is not active.
2017-02-09 06:33:56 +00:00
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
}
if ((err = hls->on_publish()) != srs_success) {
return srs_error_wrap(err, "hls publish failed");
2014-03-18 03:32:58 +00:00
}
srs_trace("vhost %s hls reload success", vhost.c_str());
2017-02-11 13:14:28 +00:00
// when publish, don't need to fetch sequence header, which is old and maybe corrupt.
// when reload, we must fetch the sequence header from source cache.
// notice the source to get the cached sequence header.
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
if (cache_sh_video) {
2017-09-23 14:12:33 +00:00
if ((err = format->on_video(cache_sh_video)) != srs_success) {
return srs_error_wrap(err, "format on_video");
}
if ((err = hls->on_video(cache_sh_video, format)) != srs_success) {
return srs_error_wrap(err, "hls on_video");
}
2017-02-11 13:14:28 +00:00
}
SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
if (cache_sh_audio) {
2017-09-23 14:12:33 +00:00
if ((err = format->on_audio(cache_sh_audio)) != srs_success) {
return srs_error_wrap(err, "format on_audio");
}
if ((err = hls->on_audio(cache_sh_audio, format)) != srs_success) {
return srs_error_wrap(err, "hls on_audio");
}
2017-02-11 13:14:28 +00:00
}
2017-09-22 08:14:30 +00:00
return err;
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_hds(string vhost)
2015-03-12 14:38:11 +00:00
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
2015-03-12 14:38:11 +00:00
}
// TODO: FIXME: maybe should ignore when publish already stopped?
#ifdef SRS_HDS
2015-03-12 14:38:11 +00:00
hds->on_unpublish();
2017-02-11 13:14:28 +00:00
// Don't start HDS when source is not active.
2017-02-09 06:33:56 +00:00
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
}
2018-01-01 11:39:57 +00:00
if ((err = hds->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "hds publish failed");
2015-03-12 14:38:11 +00:00
}
srs_trace("vhost %s hds reload success", vhost.c_str());
#endif
2017-09-22 08:14:30 +00:00
return err;
2015-03-12 14:38:11 +00:00
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost)
2014-04-17 08:22:21 +00:00
{
2017-06-11 01:40:07 +00:00
srs_error_t err = srs_success;
2014-04-17 08:22:21 +00:00
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
2014-04-17 08:22:21 +00:00
}
// TODO: FIXME: maybe should ignore when publish already stopped?
// cleanup dvr
2014-04-17 08:22:21 +00:00
dvr->on_unpublish();
2017-02-11 13:14:28 +00:00
// Don't start DVR when source is not active.
2017-02-09 06:33:56 +00:00
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
}
// reinitialize the dvr, update plan.
2017-06-11 01:40:07 +00:00
if ((err = dvr->initialize(this, req)) != srs_success) {
2017-09-22 08:14:30 +00:00
return srs_error_wrap(err, "reload dvr");
}
// start to publish by new plan.
2017-09-23 14:12:33 +00:00
if ((err = dvr->on_publish()) != srs_success) {
return srs_error_wrap(err, "dvr publish failed");
2014-04-17 08:22:21 +00:00
}
2017-09-23 14:12:33 +00:00
if ((err = on_dvr_request_sh()) != srs_success) {
return srs_error_wrap(err, "request sh");
2017-02-11 13:14:28 +00:00
}
2014-04-17 08:22:21 +00:00
srs_trace("vhost %s dvr reload success", vhost.c_str());
2017-09-22 08:14:30 +00:00
return err;
2014-04-17 08:22:21 +00:00
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost)
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
encoder->on_unpublish();
2017-02-11 13:14:28 +00:00
// Don't start transcode when source is not active.
2017-02-09 06:33:56 +00:00
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
}
2018-01-01 11:39:57 +00:00
if ((err = encoder->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "start encoder failed");
}
srs_trace("vhost %s transcode reload success", vhost.c_str());
2017-09-22 08:14:30 +00:00
return err;
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost)
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
}
// TODO: FIXME: maybe should ignore when publish already stopped?
ng_exec->on_unpublish();
2017-02-09 06:33:56 +00:00
2017-02-11 13:14:28 +00:00
// Don't start exec when source is not active.
2017-02-09 06:33:56 +00:00
if (!is_active) {
2017-09-22 08:14:30 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
2017-02-09 06:33:56 +00:00
2018-01-01 11:39:57 +00:00
if ((err = ng_exec->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "start exec failed");
}
srs_trace("vhost %s exec reload success", vhost.c_str());
2017-09-22 08:14:30 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsOriginHub::create_forwarders()
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
if (!_srs_config->get_forward_enabled(req->vhost)) {
2017-09-23 14:12:33 +00:00
return err;
}
SrsConfDirective* conf = _srs_config->get_forwards(req->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);
SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);
// initialize the forwarder with request.
2017-09-23 14:12:33 +00:00
if ((err = forwarder->initialize(req, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size);
2017-09-23 14:12:33 +00:00
if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str());
}
}
2017-09-23 14:12:33 +00:00
return err;
}
void SrsOriginHub::destroy_forwarders()
{
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
forwarder->on_unpublish();
srs_freep(forwarder);
}
forwarders.clear();
}
SrsMetaCache::SrsMetaCache()
{
2017-01-19 07:51:34 +00:00
meta = video = audio = NULL;
previous_video = previous_audio = NULL;
vformat = new SrsRtmpFormat();
aformat = new SrsRtmpFormat();
}
SrsMetaCache::~SrsMetaCache()
{
dispose();
}
void SrsMetaCache::dispose()
{
clear();
srs_freep(previous_video);
srs_freep(previous_audio);
}
void SrsMetaCache::clear()
{
2017-01-19 07:51:34 +00:00
srs_freep(meta);
srs_freep(video);
srs_freep(audio);
}
SrsSharedPtrMessage* SrsMetaCache::data()
{
2017-01-19 07:51:34 +00:00
return meta;
}
SrsSharedPtrMessage* SrsMetaCache::vsh()
{
2017-01-19 07:51:34 +00:00
return video;
}
SrsFormat* SrsMetaCache::vsh_format()
{
return vformat;
}
SrsSharedPtrMessage* SrsMetaCache::ash()
{
2017-01-19 07:51:34 +00:00
return audio;
}
SrsFormat* SrsMetaCache::ash_format()
{
return aformat;
}
2020-05-12 05:19:31 +00:00
srs_error_t SrsMetaCache::dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
// copy metadata.
2017-09-23 14:12:33 +00:00
if (dm && meta && (err = consumer->enqueue(meta, atc, ag)) != srs_success) {
return srs_error_wrap(err, "enqueue metadata");
}
// copy sequence header
// copy audio sequence first, for hls to fast parse the "right" audio codec.
// @see https://github.com/ossrs/srs/issues/301
2017-09-23 14:12:33 +00:00
if (ds && audio && (err = consumer->enqueue(audio, atc, ag)) != srs_success) {
return srs_error_wrap(err, "enqueue audio sh");
}
2017-09-23 14:12:33 +00:00
if (ds && video && (err = consumer->enqueue(video, atc, ag)) != srs_success) {
return srs_error_wrap(err, "enqueue video sh");
}
2017-09-23 14:12:33 +00:00
return err;
}
SrsSharedPtrMessage* SrsMetaCache::previous_vsh()
{
return previous_video;
}
SrsSharedPtrMessage* SrsMetaCache::previous_ash()
{
return previous_audio;
}
void SrsMetaCache::update_previous_vsh()
{
srs_freep(previous_video);
previous_video = video? video->copy() : NULL;
}
void SrsMetaCache::update_previous_ash()
{
srs_freep(previous_audio);
previous_audio = audio? audio->copy() : NULL;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated)
{
updated = false;
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
SrsAmf0Any* prop = NULL;
// when exists the duration, remove it to make ExoPlayer happy.
if (metadata->metadata->get_property("duration") != NULL) {
metadata->metadata->remove("duration");
}
// generate metadata info to print
std::stringstream ss;
if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) {
ss << ", width=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) {
ss << ", height=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) {
ss << ", vcodec=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
ss << ", acodec=" << (int)prop->to_number();
}
srs_trace("got metadata%s", ss.str().c_str());
// add server info to metadata
metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
2019-04-30 00:38:57 +00:00
// version, for example, 1.0.0
// add version to metadata, please donot remove it, for debug.
metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
// encode the metadata to payload
int size = 0;
char* payload = NULL;
2018-01-01 11:39:57 +00:00
if ((err = metadata->encode(size, payload)) != srs_success) {
return srs_error_wrap(err, "encode metadata");
}
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
2017-09-23 14:12:33 +00:00
return err;
}
// create a shared ptr message.
2017-01-19 07:51:34 +00:00
srs_freep(meta);
meta = new SrsSharedPtrMessage();
updated = true;
// dump message to shared ptr message.
// the payload/size managed by cache_metadata, user should not free it.
2018-01-01 11:39:57 +00:00
if ((err = meta->create(header, payload, size)) != srs_success) {
return srs_error_wrap(err, "create metadata");
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsMetaCache::update_ash(SrsSharedPtrMessage* msg)
{
2017-01-19 07:51:34 +00:00
srs_freep(audio);
audio = msg->copy();
update_previous_ash();
return aformat->on_audio(msg);
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg)
{
2017-01-19 07:51:34 +00:00
srs_freep(video);
video = msg->copy();
update_previous_vsh();
return vformat->on_video(msg);
}
SrsSourceManager* _srs_sources = new SrsSourceManager();
SrsSourceManager::SrsSourceManager()
{
lock = NULL;
}
SrsSourceManager::~SrsSourceManager()
{
srs_mutex_destroy(lock);
}
srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
2017-06-11 01:40:07 +00:00
srs_error_t err = srs_success;
// Lazy create lock, because ST is not ready in SrsSourceManager constructor.
if (!lock) {
lock = srs_mutex_new();
}
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
2020-05-12 11:53:21 +00:00
// TODO: FIXME: Use smaller lock.
SrsLocker(lock);
SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
2017-09-23 14:12:33 +00:00
return err;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
2020-05-12 11:53:21 +00:00
#ifdef SRS_RTC
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost);
// Get the RTC source and bridger.
SrsRtcSource* rtc = NULL;
if (rtc_server_enabled && rtc_enabled) {
if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) {
err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str());
goto failed;
}
}
#endif
srs_trace("new source, stream_url=%s", stream_url.c_str());
2020-05-12 11:53:21 +00:00
source = new SrsSource();
2017-06-11 01:40:07 +00:00
if ((err = source->initialize(r, h)) != srs_success) {
2020-05-12 11:53:21 +00:00
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed;
}
#ifdef SRS_RTC
// If rtc enabled, bridge RTMP source to RTC,
// all RTMP packets will be forwarded to RTC source.
if (source && rtc) {
source->bridge_to(rtc->bridger());
}
2020-05-12 11:53:21 +00:00
#endif
2017-03-25 09:21:39 +00:00
pool[stream_url] = source;
*pps = source;
2020-05-12 11:53:21 +00:00
return err;
failed:
srs_freep(source);
2017-09-23 14:12:33 +00:00
return err;
}
SrsSource* SrsSourceManager::fetch(SrsRequest* r)
{
SrsSource* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
2017-03-25 09:21:39 +00:00
source = pool[stream_url];
2017-03-25 09:21:39 +00:00
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->update_auth(r);
2017-03-25 09:21:39 +00:00
return source;
}
void SrsSourceManager::dispose()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
source->dispose();
}
return;
}
srs_error_t SrsSourceManager::cycle()
{
int cid = _srs_context->get_id();
srs_error_t err = do_cycle();
_srs_context->set_id(cid);
2017-06-11 01:40:07 +00:00
return err;
}
srs_error_t SrsSourceManager::do_cycle()
{
2017-06-11 01:40:07 +00:00
srs_error_t err = srs_success;
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
2017-06-11 01:40:07 +00:00
if ((err = source->cycle()) != srs_success) {
return srs_error_wrap(err, "source=%d/%d cycle", source->source_id(), source->pre_source_id());
}
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
// @see https://github.com/ossrs/srs/issues/714
#if 0
// When source expired, remove it.
if (source->expired()) {
int cid = source->source_id();
if (cid == -1 && source->pre_source_id() > 0) {
cid = source->pre_source_id();
}
if (cid > 0) {
_srs_context->set_id(cid);
}
srs_trace("cleanup die source, total=%d", (int)pool.size());
srs_freep(source);
pool.erase(it++);
} else {
++it;
}
#else
++it;
#endif
}
2017-06-11 01:40:07 +00:00
return err;
}
void SrsSourceManager::destroy()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
srs_freep(source);
}
pool.clear();
}
2020-05-12 11:53:21 +00:00
ISrsSourceBridger::ISrsSourceBridger()
{
}
ISrsSourceBridger::~ISrsSourceBridger()
{
}
SrsSource::SrsSource()
{
req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = 0;
2020-05-12 11:53:21 +00:00
handler = NULL;
bridger = NULL;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
2017-02-11 13:14:28 +00:00
hub = new SrsOriginHub();
meta = new SrsMetaCache();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
SrsSource::~SrsSource()
{
_srs_config->unsubscribe(this);
2017-03-25 09:21:39 +00:00
// never free the consumers,
// for all consumers are auto free.
consumers.clear();
srs_freep(hub);
srs_freep(meta);
srs_freep(mix_queue);
srs_freep(play_edge);
srs_freep(publish_edge);
srs_freep(gop_cache);
2017-03-25 09:21:39 +00:00
srs_freep(req);
}
void SrsSource::dispose()
{
hub->dispose();
meta->dispose();
gop_cache->dispose();
}
2017-06-11 01:40:07 +00:00
srs_error_t SrsSource::cycle()
{
2017-06-11 01:40:07 +00:00
srs_error_t err = hub->cycle();
if (err != srs_success) {
return srs_error_wrap(err, "hub cycle");
}
return srs_success;
}
bool SrsSource::expired()
{
// unknown state?
if (die_at == 0) {
return false;
}
// still publishing?
if (!_can_publish || !publish_edge->can_publish()) {
return false;
}
// has any consumers?
if (!consumers.empty()) {
return false;
}
srs_utime_t now = srs_get_system_time();
if (now > die_at + SRS_SOURCE_CLEANUP) {
return true;
}
return false;
}
2017-06-11 01:40:07 +00:00
srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
2017-06-11 01:40:07 +00:00
srs_error_t err = srs_success;
srs_assert(h);
srs_assert(!req);
2017-03-25 09:21:39 +00:00
handler = h;
req = r->copy();
atc = _srs_config->get_atc(req->vhost);
2017-06-11 01:40:07 +00:00
if ((err = hub->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "hub");
2014-03-18 03:32:58 +00:00
}
2017-03-25 09:21:39 +00:00
2017-06-11 01:40:07 +00:00
if ((err = play_edge->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "edge(play)");
2014-03-18 03:32:58 +00:00
}
2017-06-11 01:40:07 +00:00
if ((err = publish_edge->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "edge(publish)");
2014-03-18 03:32:58 +00:00
}
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
publish_edge->set_queue_size(queue_size);
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
mix_correct = _srs_config->get_mix_correct(req->vhost);
2017-06-11 01:40:07 +00:00
return err;
}
2020-05-12 11:53:21 +00:00
void SrsSource::set_bridger(ISrsSourceBridger* v)
{
bridger = v;
}
2017-09-22 08:14:30 +00:00
srs_error_t SrsSource::on_reload_vhost_play(string vhost)
{
2017-09-22 08:14:30 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
if (req->vhost != vhost) {
2017-09-22 08:14:30 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
2014-04-17 08:06:49 +00:00
// time_jitter
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
2014-04-17 08:06:49 +00:00
// mix_correct
if (true) {
bool v = _srs_config->get_mix_correct(req->vhost);
2014-04-17 08:06:49 +00:00
// when changed, clear the mix queue.
if (v != mix_correct) {
mix_queue->clear();
2014-04-17 08:06:49 +00:00
}
mix_correct = v;
}
// atc changed.
if (true) {
bool v = _srs_config->get_atc(vhost);
2014-04-17 08:06:49 +00:00
if (v != atc) {
srs_warn("vhost %s atc changed to %d, connected client may corrupt.", vhost.c_str(), v);
gop_cache->clear();
2014-04-17 08:06:49 +00:00
}
atc = v;
}
// gop cache changed.
if (true) {
bool v = _srs_config->get_gop_cache(vhost);
2014-04-17 08:06:49 +00:00
if (v != gop_cache->enabled()) {
string url = req->get_stream_url();
srs_trace("vhost %s gop_cache changed to %d, source url=%s", vhost.c_str(), v, url.c_str());
gop_cache->set(v);
2014-04-17 08:06:49 +00:00
}
}
2014-03-18 03:32:58 +00:00
// queue length
if (true) {
srs_utime_t v = _srs_config->get_queue_length(req->vhost);
if (true) {
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
consumer->set_queue_size(v);
}
srs_trace("consumers reload queue size success.");
}
// TODO: FIXME: https://github.com/ossrs/srs/issues/742#issuecomment-273656897
// TODO: FIXME: support queue size.
#if 0
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
forwarder->set_queue_size(v);
}
srs_trace("forwarders reload queue size success.");
}
if (true) {
publish_edge->set_queue_size(v);
srs_trace("publish_edge reload queue size success.");
}
#endif
}
2014-03-18 03:32:58 +00:00
2017-09-22 08:14:30 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_source_id_changed(int id)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
if (_source_id == id) {
2017-09-23 14:12:33 +00:00
return err;
}
2016-09-05 06:13:37 +00:00
if (_pre_source_id == -1) {
_pre_source_id = id;
} else if (_pre_source_id != _source_id) {
_pre_source_id = _source_id;
}
_source_id = id;
// notice all consumer
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
consumer->update_source_id();
}
2017-09-23 14:12:33 +00:00
return err;
}
int SrsSource::source_id()
{
return _source_id;
}
2016-09-05 06:13:37 +00:00
int SrsSource::pre_source_id()
{
return _pre_source_id;
}
2018-02-16 08:39:07 +00:00
bool SrsSource::inactive()
{
return _can_publish;
}
void SrsSource::update_auth(SrsRequest* r)
{
req->update_auth(r);
}
2015-07-08 09:08:29 +00:00
bool SrsSource::can_publish(bool is_edge)
{
2015-07-08 09:08:29 +00:00
if (is_edge) {
return publish_edge->can_publish();
}
2017-03-25 09:21:39 +00:00
2015-07-08 09:08:29 +00:00
return _can_publish;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
// if allow atc_auto and bravo-atc detected, open atc for vhost.
SrsAmf0Any* prop = NULL;
atc = _srs_config->get_atc(req->vhost);
if (_srs_config->get_atc_auto(req->vhost)) {
if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {
if (prop->is_string() && prop->to_str() == "true") {
atc = true;
}
2014-04-29 10:27:00 +00:00
}
}
// Update the meta cache.
bool updated = false;
2017-09-23 14:12:33 +00:00
if ((err = meta->update_data(&msg->header, metadata, updated)) != srs_success) {
return srs_error_wrap(err, "update metadata");
2014-03-18 03:32:58 +00:00
}
if (!updated) {
2017-09-23 14:12:33 +00:00
return err;
}
// when already got metadata, drop when reduce sequence header.
bool drop_for_reduce = false;
if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {
drop_for_reduce = true;
srs_warn("drop for reduce sh metadata, size=%d", msg->size);
}
2014-03-18 03:32:58 +00:00
// copy to all consumer
if (!drop_for_reduce) {
2014-03-18 03:32:58 +00:00
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
2017-09-23 14:12:33 +00:00
if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume metadata");
2014-03-18 03:32:58 +00:00
}
}
}
// Copy to hub to all utilities.
return hub->on_meta_data(meta->data(), metadata);
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_audio(SrsCommonMessage* shared_audio)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
// monotically increase detect.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) {
is_monotonically_increase = false;
2015-08-05 14:54:29 +00:00
srs_warn("AUDIO: stream not monotonically increase, please open mix_correct.");
}
}
last_packet_time = shared_audio->header.timestamp;
// convert shared_audio to msg, user should not use shared_audio again.
// the payload is transfer to msg, and set to NULL in shared_audio.
SrsSharedPtrMessage msg;
2018-01-01 11:39:57 +00:00
if ((err = msg.create(shared_audio)) != srs_success) {
return srs_error_wrap(err, "create message");
2014-03-18 03:32:58 +00:00
}
// directly process the audio message.
if (!mix_correct) {
return on_audio_imp(&msg);
}
// insert msg to the queue.
mix_queue->push(msg.copy());
// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop();
if (!m) {
2017-09-23 14:12:33 +00:00
return err;
}
// consume the monotonically increase message.
if (m->is_audio()) {
2017-09-23 14:12:33 +00:00
err = on_audio_imp(m);
} else {
2017-09-23 14:12:33 +00:00
err = on_video_imp(m);
}
srs_freep(m);
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2017-02-12 12:38:39 +00:00
bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);
bool is_sequence_header = is_aac_sequence_header;
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_ash()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->previous_ash()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
2020-03-21 13:50:06 +00:00
// Copy to hub to all utilities.
if ((err = hub->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
2020-05-12 11:53:21 +00:00
// For bridger to consume the message.
if (bridger && (err = bridger->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume audio");
}
2014-03-18 03:32:58 +00:00
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
2017-09-23 14:12:33 +00:00
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume message");
2014-03-18 03:32:58 +00:00
}
}
}
// cache the sequence header of aac, or first packet of mp3.
// for example, the mp3 is used for hls to write the "right" audio codec.
2015-03-08 07:33:08 +00:00
// TODO: FIXME: to refine the stream info system.
if (is_aac_sequence_header || !meta->ash()) {
2017-09-23 14:12:33 +00:00
if ((err = meta->update_ash(msg)) != srs_success) {
return srs_error_wrap(err, "meta consume audio");
}
}
2014-03-18 03:32:58 +00:00
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
2017-09-23 14:12:33 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
// cache the last gop packets
2017-09-23 14:12:33 +00:00
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume audio");
2014-03-18 03:32:58 +00:00
}
2020-05-12 11:53:21 +00:00
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->ash()) {
meta->ash()->timestamp = msg->timestamp;
}
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
// monotically increase detect.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {
is_monotonically_increase = false;
srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");
}
}
last_packet_time = shared_video->header.timestamp;
// drop any unknown header video.
2015-11-11 02:37:50 +00:00
// @see https://github.com/ossrs/srs/issues/421
2017-02-12 12:38:39 +00:00
if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
char b0 = 0x00;
if (shared_video->size > 0) {
b0 = shared_video->payload[0];
}
srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
2017-09-23 14:12:33 +00:00
return err;
}
// convert shared_video to msg, user should not use shared_video again.
// the payload is transfer to msg, and set to NULL in shared_video.
SrsSharedPtrMessage msg;
2018-01-01 11:39:57 +00:00
if ((err = msg.create(shared_video)) != srs_success) {
return srs_error_wrap(err, "create message");
2014-03-18 03:32:58 +00:00
}
2020-03-08 11:20:46 +00:00
// directly process the video message.
if (!mix_correct) {
return on_video_imp(&msg);
}
// insert msg to the queue.
mix_queue->push(msg.copy());
// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop();
if (!m) {
2017-09-23 14:12:33 +00:00
return err;
}
// consume the monotonically increase message.
if (m->is_audio()) {
2017-09-23 14:12:33 +00:00
err = on_audio_imp(m);
} else {
2017-09-23 14:12:33 +00:00
err = on_video_imp(m);
}
srs_freep(m);
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2017-02-12 12:38:39 +00:00
bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_vsh()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->previous_vsh()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
}
}
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
2017-09-23 14:12:33 +00:00
if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
return srs_error_wrap(err, "meta update video");
}
// Copy to hub to all utilities.
2017-09-23 14:12:33 +00:00
if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
return srs_error_wrap(err, "hub consume video");
2015-03-11 05:34:58 +00:00
}
2020-03-13 12:34:40 +00:00
2020-05-12 11:53:21 +00:00
// For bridger to consume the message.
if (bridger && (err = bridger->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume video");
}
2014-03-18 03:32:58 +00:00
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
2017-09-23 14:12:33 +00:00
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume video");
2014-03-18 03:32:58 +00:00
}
}
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
2017-09-23 14:12:33 +00:00
return err;
2014-03-18 03:32:58 +00:00
}
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
// cache the last gop packets
2017-09-23 14:12:33 +00:00
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume vdieo");
2014-03-18 03:32:58 +00:00
}
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->vsh()) {
meta->vsh()->timestamp = msg->timestamp;
}
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_aggregate(SrsCommonMessage* msg)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2018-01-01 11:39:57 +00:00
SrsBuffer* stream = new SrsBuffer(msg->payload, msg->size);
SrsAutoFree(SrsBuffer, stream);
// the aggregate message always use abs time.
int delta = -1;
while (!stream->empty()) {
if (!stream->require(1)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate");
}
int8_t type = stream->read_1bytes();
if (!stream->require(3)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate");
}
int32_t data_size = stream->read_3bytes();
if (data_size < 0) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate size");
}
if (!stream->require(3)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate time");
}
int32_t timestamp = stream->read_3bytes();
if (!stream->require(1)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate time(high bits)");
}
int32_t time_h = stream->read_1bytes();
timestamp |= time_h<<24;
timestamp &= 0x7FFFFFFF;
// adjust abs timestamp in aggregate msg.
// only -1 means uninitialized delta.
if (delta == -1) {
delta = (int)msg->header.timestamp - (int)timestamp;
}
timestamp += delta;
if (!stream->require(3)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate stream id");
}
int32_t stream_id = stream->read_3bytes();
if (data_size > 0 && !stream->require(data_size)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate data");
}
// to common message.
SrsCommonMessage o;
o.header.message_type = type;
o.header.payload_length = data_size;
o.header.timestamp_delta = timestamp;
o.header.timestamp = timestamp;
o.header.stream_id = stream_id;
o.header.perfer_cid = msg->header.perfer_cid;
2017-03-25 09:21:39 +00:00
if (data_size > 0) {
o.size = data_size;
o.payload = new char[o.size];
stream->read_bytes(o.payload, o.size);
}
if (!stream->require(4)) {
2017-09-23 14:12:33 +00:00
return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate previous tag size");
}
stream->read_4bytes();
2017-03-25 09:21:39 +00:00
// process parsed message
if (o.header.is_audio()) {
2017-09-23 14:12:33 +00:00
if ((err = on_audio(&o)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
} else if (o.header.is_video()) {
2017-09-23 14:12:33 +00:00
if ((err = on_video(&o)) != srs_success) {
return srs_error_wrap(err, "consume video");
}
}
}
2017-09-23 14:12:33 +00:00
return err;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_publish()
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
// update the request object.
srs_assert(req);
2014-03-18 03:32:58 +00:00
_can_publish = false;
// whatever, the publish thread is the source or edge source,
// save its id to srouce id.
2017-09-23 14:12:33 +00:00
if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "source id change");
}
// reset the mix queue.
mix_queue->clear();
// Reset the metadata cache, to make VLC happy when disable/enable stream.
// @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448
meta->clear();
// detect the monotonically again.
is_monotonically_increase = true;
last_packet_time = 0;
// Notify the hub about the publish event.
2017-09-23 14:12:33 +00:00
if ((err = hub->on_publish()) != srs_success) {
return srs_error_wrap(err, "hub publish");
}
2017-03-25 09:21:39 +00:00
// notify the handler.
srs_assert(handler);
2017-09-23 14:12:33 +00:00
if ((err = handler->on_publish(this, req)) != srs_success) {
return srs_error_wrap(err, "handle publish");
}
2020-05-12 11:53:21 +00:00
if (bridger && (err = bridger->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger publish");
}
2015-05-08 08:45:25 +00:00
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(req, _source_id);
2017-09-23 14:12:33 +00:00
return err;
}
void SrsSource::on_unpublish()
{
2016-09-05 06:13:37 +00:00
// ignore when already unpublished.
if (_can_publish) {
return;
}
// Notify the hub about the unpublish event.
hub->on_unpublish();
2017-03-25 09:21:39 +00:00
// only clear the gop cache,
// donot clear the sequence header, for it maybe not changed,
// when drop dup sequence header, drop the metadata also.
2014-03-18 03:32:58 +00:00
gop_cache->clear();
// Reset the metadata cache, to make VLC happy when disable/enable stream.
// @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448
meta->update_previous_vsh();
meta->update_previous_ash();
2014-05-29 06:16:34 +00:00
srs_trace("cleanup when unpublish");
2014-03-18 03:32:58 +00:00
_can_publish = true;
_source_id = -1;
2017-03-25 09:21:39 +00:00
// notify the handler.
srs_assert(handler);
2015-05-08 08:45:25 +00:00
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);
2020-05-12 11:53:21 +00:00
handler->on_unpublish(this, req);
2020-05-12 11:53:21 +00:00
if (bridger) {
bridger->on_unpublish();
}
2016-09-05 06:13:37 +00:00
// no consumer, stream is die.
if (consumers.empty()) {
die_at = srs_get_system_time();
2016-09-05 06:13:37 +00:00
}
}
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer)
{
2017-09-23 14:12:33 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
consumer = new SrsConsumer(this, conn);
2014-03-18 03:32:58 +00:00
consumers.push_back(consumer);
// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
if ((err = play_edge->on_client_play()) != srs_success) {
return srs_error_wrap(err, "play edge");
}
}
return err;
}
srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
2014-03-18 03:32:58 +00:00
consumer->set_queue_size(queue_size);
// if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) {
if (meta->data()) {
meta->data()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->vsh()) {
meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->ash()) {
meta->ash()->timestamp = srsu2ms(gop_cache->start_time());
}
}
// If stream is publishing, dumps the sequence header and gop cache.
if (hub->active()) {
// Copy metadata and sequence header to consumer.
if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
return srs_error_wrap(err, "meta dumps");
}
// copy gop cache to client.
if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "gop cache dumps");
}
}
// print status.
if (dg) {
srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
2014-03-18 03:32:58 +00:00
}
2017-09-23 14:12:33 +00:00
return err;
}
void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
{
2014-03-18 03:32:58 +00:00
std::vector<SrsConsumer*>::iterator it;
it = std::find(consumers.begin(), consumers.end(), consumer);
if (it != consumers.end()) {
consumers.erase(it);
}
if (consumers.empty()) {
2014-04-27 01:29:37 +00:00
play_edge->on_all_client_stop();
die_at = srs_get_system_time();
}
}
void SrsSource::set_cache(bool enabled)
{
2014-03-18 03:32:58 +00:00
gop_cache->set(enabled);
}
SrsRtmpJitterAlgorithm SrsSource::jitter()
{
return jitter_algorithm;
}
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_edge_start_publish()
2014-04-27 01:29:37 +00:00
{
return publish_edge->on_client_publish();
2014-04-25 08:35:03 +00:00
}
// TODO: FIXME: Use edge strategy pattern.
2017-09-23 14:12:33 +00:00
srs_error_t SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{
2014-04-27 06:57:28 +00:00
return publish_edge->on_proxy_publish(msg);
}
void SrsSource::on_edge_proxy_unpublish()
{
publish_edge->on_proxy_unpublish();
}
2015-09-17 05:36:02 +00:00
string SrsSource::get_curr_origin()
{
return play_edge->get_curr_origin();
}