1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #299, refine the codec to format-frame-sample chain.

This commit is contained in:
winlin 2017-02-12 18:18:18 +08:00
parent c4a510b834
commit d7458c4e72
18 changed files with 990 additions and 1150 deletions

View file

@ -62,80 +62,14 @@ using namespace std;
// reset the piece id when deviation overflow this.
#define SRS_JUMP_WHEN_PIECE_DEVIATION 20
SrsHlsCacheWriter::SrsHlsCacheWriter(bool write_cache, bool write_file)
{
should_write_cache = write_cache;
should_write_file = write_file;
}
SrsHlsCacheWriter::~SrsHlsCacheWriter()
{
}
int SrsHlsCacheWriter::open(string file)
{
if (!should_write_file) {
return ERROR_SUCCESS;
}
return impl.open(file);
}
void SrsHlsCacheWriter::close()
{
if (!should_write_file) {
return;
}
impl.close();
}
bool SrsHlsCacheWriter::is_open()
{
if (!should_write_file) {
return true;
}
return impl.is_open();
}
int64_t SrsHlsCacheWriter::tellg()
{
if (!should_write_file) {
return 0;
}
return impl.tellg();
}
int SrsHlsCacheWriter::write(void* buf, size_t count, ssize_t* pnwrite)
{
if (should_write_cache) {
if (count > 0) {
data.append((char*)buf, count);
}
}
if (should_write_file) {
return impl.write(buf, count, pnwrite);
}
return ERROR_SUCCESS;
}
string SrsHlsCacheWriter::cache()
{
return data;
}
SrsHlsSegment::SrsHlsSegment(SrsTsContext* c, bool write_cache, bool write_file, SrsCodecAudio ac, SrsCodecVideo vc)
SrsHlsSegment::SrsHlsSegment(SrsTsContext* c, SrsCodecAudio ac, SrsCodecVideo vc)
{
duration = 0;
sequence_no = 0;
segment_start_dts = 0;
is_sequence_header = false;
writer = new SrsHlsCacheWriter(write_cache, write_file);
muxer = new SrsTSMuxer(writer, c, ac, vc);
writer = new SrsFileWriter();
muxer = new SrsTsMuxer(writer, c, ac, vc);
}
SrsHlsSegment::~SrsHlsSegment()
@ -290,8 +224,6 @@ SrsHlsMuxer::SrsHlsMuxer()
_sequence_no = 0;
current = NULL;
acodec = SrsCodecAudioReserved1;
should_write_cache = false;
should_write_file = true;
async = new SrsAsyncCallWorker();
context = new SrsTsContext();
}
@ -313,28 +245,26 @@ SrsHlsMuxer::~SrsHlsMuxer()
void SrsHlsMuxer::dispose()
{
if (should_write_file) {
std::vector<SrsHlsSegment*>::iterator it;
for (it = segments.begin(); it != segments.end(); ++it) {
SrsHlsSegment* segment = *it;
if (unlink(segment->full_path.c_str()) < 0) {
srs_warn("dispose unlink path failed, file=%s.", segment->full_path.c_str());
}
srs_freep(segment);
std::vector<SrsHlsSegment*>::iterator it;
for (it = segments.begin(); it != segments.end(); ++it) {
SrsHlsSegment* segment = *it;
if (unlink(segment->full_path.c_str()) < 0) {
srs_warn("dispose unlink path failed, file=%s.", segment->full_path.c_str());
}
segments.clear();
if (current) {
std::string path = current->full_path + ".tmp";
if (unlink(path.c_str()) < 0) {
srs_warn("dispose unlink path failed, file=%s", path.c_str());
}
srs_freep(current);
}
if (unlink(m3u8.c_str()) < 0) {
srs_warn("dispose unlink path failed. file=%s", m3u8.c_str());
srs_freep(segment);
}
segments.clear();
if (current) {
std::string path = current->full_path + ".tmp";
if (unlink(path.c_str()) < 0) {
srs_warn("dispose unlink path failed, file=%s", path.c_str());
}
srs_freep(current);
}
if (unlink(m3u8.c_str()) < 0) {
srs_warn("dispose unlink path failed. file=%s", m3u8.c_str());
}
// TODO: FIXME: support hls dispose in HTTP cache.
@ -407,13 +337,9 @@ int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
// when update config, reset the history target duration.
max_td = (int)(fragment * _srs_config->get_hls_td_ratio(r->vhost));
// TODO: FIXME: refine better for SRS2 only support disk.
should_write_cache = false;
should_write_file = true;
// create m3u8 dir once.
m3u8_dir = srs_path_dirname(m3u8);
if (should_write_file && (ret = srs_create_dir_recursively(m3u8_dir)) != ERROR_SUCCESS) {
if ((ret = srs_create_dir_recursively(m3u8_dir)) != ERROR_SUCCESS) {
srs_error("create app dir %s failed. ret=%d", m3u8_dir.c_str(), ret);
return ret;
}
@ -468,7 +394,7 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
}
// new segment.
current = new SrsHlsSegment(context, should_write_cache, should_write_file, default_acodec, default_vcodec);
current = new SrsHlsSegment(context, default_acodec, default_vcodec);
current->sequence_no = _sequence_no++;
current->segment_start_dts = segment_start_dts;
@ -540,7 +466,7 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
// create dir recursively for hls.
std::string ts_dir = srs_path_dirname(current->full_path);
if (should_write_file && (ret = srs_create_dir_recursively(ts_dir)) != ERROR_SUCCESS) {
if ((ret = srs_create_dir_recursively(ts_dir)) != ERROR_SUCCESS) {
srs_error("create app dir %s failed. ret=%d", ts_dir.c_str(), ret);
return ret;
}
@ -735,7 +661,7 @@ int SrsHlsMuxer::segment_close(string log_desc)
// rename from tmp to real path
std::string tmp_file = full_path + ".tmp";
if (should_write_file && rename(tmp_file.c_str(), full_path.c_str()) < 0) {
if (rename(tmp_file.c_str(), full_path.c_str()) < 0) {
ret = ERROR_HLS_WRITE_FAILED;
srs_error("rename ts file failed, %s => %s. ret=%d",
tmp_file.c_str(), full_path.c_str(), ret);
@ -751,10 +677,8 @@ int SrsHlsMuxer::segment_close(string log_desc)
// rename from tmp to real path
std::string tmp_file = current->full_path + ".tmp";
if (should_write_file) {
if (unlink(tmp_file.c_str()) < 0) {
srs_warn("ignore unlink path failed, file=%s.", tmp_file.c_str());
}
if (unlink(tmp_file.c_str()) < 0) {
srs_warn("ignore unlink path failed, file=%s.", tmp_file.c_str());
}
srs_freep(current);
@ -788,10 +712,8 @@ int SrsHlsMuxer::segment_close(string log_desc)
for (int i = 0; i < (int)segment_to_remove.size(); i++) {
SrsHlsSegment* segment = segment_to_remove[i];
if (hls_cleanup && should_write_file) {
if (unlink(segment->full_path.c_str()) < 0) {
srs_warn("cleanup unlink path failed, file=%s.", segment->full_path.c_str());
}
if (hls_cleanup && unlink(segment->full_path.c_str()) < 0) {
srs_warn("cleanup unlink path failed, file=%s.", segment->full_path.c_str());
}
srs_freep(segment);
@ -818,7 +740,7 @@ int SrsHlsMuxer::refresh_m3u8()
std::string temp_m3u8 = m3u8 + ".temp";
if ((ret = _refresh_m3u8(temp_m3u8)) == ERROR_SUCCESS) {
if (should_write_file && rename(temp_m3u8.c_str(), m3u8.c_str()) < 0) {
if (rename(temp_m3u8.c_str(), m3u8.c_str()) < 0) {
ret = ERROR_HLS_WRITE_FAILED;
srs_error("rename m3u8 file failed. %s => %s, ret=%d", temp_m3u8.c_str(), m3u8.c_str(), ret);
}
@ -843,7 +765,7 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
return ret;
}
SrsHlsCacheWriter writer(should_write_cache, should_write_file);
SrsFileWriter writer;
if ((ret = writer.open(m3u8_file)) != ERROR_SUCCESS) {
srs_error("open m3u8 file %s failed. ret=%d", m3u8_file.c_str(), ret);
return ret;
@ -919,17 +841,54 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
return ret;
}
SrsHlsCache::SrsHlsCache()
SrsHlsController::SrsHlsController()
{
cache = new SrsTsCache();
ts = new SrsTsCache();
muxer = new SrsHlsMuxer();
}
SrsHlsCache::~SrsHlsCache()
SrsHlsController::~SrsHlsController()
{
srs_freep(cache);
srs_freep(muxer);
srs_freep(ts);
}
int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req, int64_t segment_start_dts)
int SrsHlsController::initialize()
{
return muxer->initialize();
}
void SrsHlsController::dispose()
{
muxer->dispose();
}
int SrsHlsController::update_acodec(SrsCodecAudio ac)
{
return muxer->update_acodec(ac);
}
int SrsHlsController::sequence_no()
{
return muxer->sequence_no();
}
string SrsHlsController::ts_url()
{
return muxer->ts_url();
}
double SrsHlsController::duration()
{
return muxer->duration();
}
int SrsHlsController::deviation()
{
return muxer->deviation();
}
int SrsHlsController::on_publish(SrsRequest* req, int64_t segment_start_dts)
{
int ret = ERROR_SUCCESS;
@ -978,11 +937,11 @@ int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req, int64_t segment
return ret;
}
int SrsHlsCache::on_unpublish(SrsHlsMuxer* muxer)
int SrsHlsController::on_unpublish()
{
int ret = ERROR_SUCCESS;
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_audio(ts)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush audio failed. ret=%d", ret);
return ret;
}
@ -994,7 +953,7 @@ int SrsHlsCache::on_unpublish(SrsHlsMuxer* muxer)
return ret;
}
int SrsHlsCache::on_sequence_header(SrsHlsMuxer* muxer)
int SrsHlsController::on_sequence_header()
{
// TODO: support discontinuity for the same stream
// currently we reap and insert discontinity when encoder republish,
@ -1005,12 +964,12 @@ int SrsHlsCache::on_sequence_header(SrsHlsMuxer* muxer)
return muxer->on_sequence_header();
}
int SrsHlsCache::write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t pts, SrsCodecSample* sample)
int SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts)
{
int ret = ERROR_SUCCESS;
// write audio to cache.
if ((ret = cache->cache_audio(codec, pts, sample)) != ERROR_SUCCESS) {
if ((ret = ts->cache_audio(frame, pts)) != ERROR_SUCCESS) {
return ret;
}
@ -1022,16 +981,16 @@ int SrsHlsCache::write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
// @see https://github.com/ossrs/srs/issues/151
// we use absolutely overflow of segment to make jwplayer/ffplay happy
// @see https://github.com/ossrs/srs/issues/151#issuecomment-71155184
if (cache->audio && muxer->is_segment_absolutely_overflow()) {
if (ts->audio && muxer->is_segment_absolutely_overflow()) {
srs_info("hls: absolute audio reap segment.");
if ((ret = reap_segment("audio", muxer, cache->audio->pts)) != ERROR_SUCCESS) {
if ((ret = reap_segment("audio", ts->audio->pts)) != ERROR_SUCCESS) {
return ret;
}
}
// for pure audio, aggregate some frame to one.
if (muxer->pure_audio() && cache->audio) {
if (pts - cache->audio->start_pts < SRS_CONSTS_HLS_PURE_AUDIO_AGGREGATE) {
if (muxer->pure_audio() &&ts->audio) {
if (pts - ts->audio->start_pts < SRS_CONSTS_HLS_PURE_AUDIO_AGGREGATE) {
return ret;
}
}
@ -1040,19 +999,19 @@ int SrsHlsCache::write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
// it's ok for the hls overload, or maybe cause the audio corrupt,
// which introduced by aggregate the audios to a big one.
// @see https://github.com/ossrs/srs/issues/512
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_audio(ts)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsHlsCache::write_video(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t dts, SrsCodecSample* sample)
int SrsHlsController::write_video(SrsVideoFrame* frame, int64_t dts)
{
int ret = ERROR_SUCCESS;
// write video to cache.
if ((ret = cache->cache_video(codec, dts, sample)) != ERROR_SUCCESS) {
if ((ret = ts->cache_video(frame, dts)) != ERROR_SUCCESS) {
return ret;
}
@ -1061,16 +1020,16 @@ int SrsHlsCache::write_video(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
// do reap ts if any of:
// a. wait keyframe and got keyframe.
// b. always reap when not wait keyframe.
if (!muxer->wait_keyframe() || sample->frame_type == SrsCodecVideoAVCFrameKeyFrame) {
if (!muxer->wait_keyframe() || frame->frame_type == SrsCodecVideoAVCFrameKeyFrame) {
// reap the segment, which will also flush the video.
if ((ret = reap_segment("video", muxer, cache->video->dts)) != ERROR_SUCCESS) {
if ((ret = reap_segment("video", ts->video->dts)) != ERROR_SUCCESS) {
return ret;
}
}
}
// flush video when got one
if ((ret = muxer->flush_video(cache)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_video(ts)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush video failed. ret=%d", ret);
return ret;
}
@ -1078,7 +1037,7 @@ int SrsHlsCache::write_video(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t
return ret;
}
int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segment_start_dts)
int SrsHlsController::reap_segment(string log_desc, int64_t segment_start_dts)
{
int ret = ERROR_SUCCESS;
@ -1098,7 +1057,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
}
// segment open, flush video first.
if ((ret = muxer->flush_video(cache)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_video(ts)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush video failed. ret=%d", ret);
return ret;
}
@ -1106,7 +1065,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme
// segment open, flush the audio.
// @see: ngx_rtmp_hls_open_fragment
/* start fragment with audio to make iPhone happy */
if ((ret = muxer->flush_audio(cache)) != ERROR_SUCCESS) {
if ((ret = muxer->flush_audio(ts)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer flush audio failed. ret=%d", ret);
return ret;
}
@ -1125,9 +1084,7 @@ SrsHls::SrsHls()
last_update_time = 0;
jitter = new SrsRtmpJitter();
muxer = new SrsHlsMuxer();
cache = new SrsHlsCache();
controller = new SrsHlsController();
pprint = SrsPithyPrint::create_hls();
stream_dts = 0;
@ -1136,10 +1093,7 @@ SrsHls::SrsHls()
SrsHls::~SrsHls()
{
srs_freep(jitter);
srs_freep(muxer);
srs_freep(cache);
srs_freep(controller);
srs_freep(pprint);
}
@ -1149,7 +1103,7 @@ void SrsHls::dispose()
on_unpublish();
}
muxer->dispose();
controller->dispose();
}
int SrsHls::cycle()
@ -1194,7 +1148,7 @@ int SrsHls::initialize(SrsOriginHub* h, SrsFormat* f, SrsRequest* r)
req = r;
format = f;
if ((ret = muxer->initialize()) != ERROR_SUCCESS) {
if ((ret = controller->initialize()) != ERROR_SUCCESS) {
return ret;
}
@ -1217,7 +1171,7 @@ int SrsHls::on_publish()
return ret;
}
if ((ret = cache->on_publish(muxer, req, stream_dts)) != ERROR_SUCCESS) {
if ((ret = controller->on_publish(req, stream_dts)) != ERROR_SUCCESS) {
return ret;
}
@ -1239,14 +1193,14 @@ void SrsHls::on_unpublish()
return;
}
if ((ret = cache->on_unpublish(muxer)) != ERROR_SUCCESS) {
if ((ret = controller->on_unpublish()) != ERROR_SUCCESS) {
srs_error("ignore m3u8 muxer flush/close audio failed. ret=%d", ret);
}
enabled = false;
}
int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio)
int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
{
int ret = ERROR_SUCCESS;
@ -1260,35 +1214,23 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio)
SrsSharedPtrMessage* audio = shared_audio->copy();
SrsAutoFree(SrsSharedPtrMessage, audio);
sample->clear();
if ((ret = codec->audio_aac_demux(audio->payload, audio->size, sample)) != ERROR_SUCCESS) {
if (ret != ERROR_HLS_TRY_MP3) {
srs_error("hls aac demux audio failed. ret=%d", ret);
return ret;
}
if ((ret = codec->audio_mp3_demux(audio->payload, audio->size, sample)) != ERROR_SUCCESS) {
srs_error("hls mp3 demux audio failed. ret=%d", ret);
return ret;
}
}
srs_info("audio decoded, type=%d, codec=%d, cts=%d, size=%d, time=%"PRId64,
sample->frame_type, codec->audio_codec_id, sample->cts, audio->size, audio->timestamp);
SrsCodecAudio acodec = (SrsCodecAudio)codec->audio_codec_id;
// ts support audio codec: aac/mp3
srs_assert(format->acodec);
SrsCodecAudio acodec = format->acodec->id;
if (acodec != SrsCodecAudioAAC && acodec != SrsCodecAudioMP3) {
return ret;
}
// when codec changed, write new header.
if ((ret = muxer->update_acodec(acodec)) != ERROR_SUCCESS) {
if ((ret = controller->update_acodec(acodec)) != ERROR_SUCCESS) {
srs_error("http: ts audio write header failed. ret=%d", ret);
return ret;
}
// ignore sequence header
if (acodec == SrsCodecAudioAAC && sample->aac_packet_type == SrsCodecAudioTypeSequenceHeader) {
return cache->on_sequence_header(muxer);
srs_assert(format->audio);
if (acodec == SrsCodecAudioAAC && format->audio->aac_packet_type == SrsCodecAudioTypeSequenceHeader) {
return controller->on_sequence_header();
}
// TODO: FIXME: config the jitter of HLS.
@ -1303,7 +1245,7 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio)
// for pure audio, we need to update the stream dts also.
stream_dts = dts;
if ((ret = cache->write_audio(codec, muxer, dts, sample)) != ERROR_SUCCESS) {
if ((ret = controller->write_audio(format->audio, dts)) != ERROR_SUCCESS) {
srs_error("hls cache write audio failed. ret=%d", ret);
return ret;
}
@ -1311,7 +1253,7 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio)
return ret;
}
int SrsHls::on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps)
int SrsHls::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format)
{
int ret = ERROR_SUCCESS;
@ -1325,34 +1267,21 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps)
SrsSharedPtrMessage* video = shared_video->copy();
SrsAutoFree(SrsSharedPtrMessage, video);
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
if (is_sps_pps) {
codec->avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
}
sample->clear();
if ((ret = codec->video_avc_demux(video->payload, video->size, sample)) != ERROR_SUCCESS) {
srs_error("hls codec demux video failed. ret=%d", ret);
return ret;
}
srs_info("video decoded, type=%d, codec=%d, avc=%d, cts=%d, size=%d, time=%"PRId64,
sample->frame_type, codec->video_codec_id, sample->avc_packet_type, sample->cts, video->size, video->timestamp);
// ignore info frame,
// @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909
if (sample->frame_type == SrsCodecVideoAVCFrameVideoInfoFrame) {
srs_assert(format->video);
if (format->video->frame_type == SrsCodecVideoAVCFrameVideoInfoFrame) {
return ret;
}
if (codec->video_codec_id != SrsCodecVideoAVC) {
srs_assert(format->vcodec);
if (format->vcodec->id != SrsCodecVideoAVC) {
return ret;
}
// ignore sequence header
if (sample->frame_type == SrsCodecVideoAVCFrameKeyFrame
&& sample->avc_packet_type == SrsCodecVideoAVCTypeSequenceHeader) {
return cache->on_sequence_header(muxer);
if (format->video->avc_packet_type == SrsCodecVideoAVCTypeSequenceHeader) {
return controller->on_sequence_header();
}
// TODO: FIXME: config the jitter of HLS.
@ -1363,7 +1292,7 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps)
int64_t dts = video->timestamp * 90;
stream_dts = dts;
if ((ret = cache->write_video(codec, muxer, dts, sample)) != ERROR_SUCCESS) {
if ((ret = controller->write_video(format->video, dts)) != ERROR_SUCCESS) {
srs_error("hls cache write video failed. ret=%d", ret);
return ret;
}
@ -1378,15 +1307,16 @@ void SrsHls::hls_show_mux_log()
{
pprint->elapse();
// reportable
if (pprint->can_print()) {
// the run time is not equals to stream time,
// @see: https://github.com/ossrs/srs/issues/81#issuecomment-48100994
// it's ok.
srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%dp",
pprint->age(), stream_dts, stream_dts / 90, muxer->sequence_no(), muxer->ts_url().c_str(),
muxer->duration(), muxer->deviation());
if (!pprint->can_print()) {
return;
}
// the run time is not equals to stream time,
// @see: https://github.com/ossrs/srs/issues/81#issuecomment-48100994
// it's ok.
srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%dp",
pprint->age(), stream_dts, stream_dts / 90, controller->sequence_no(), controller->ts_url().c_str(),
controller->duration(), controller->deviation());
}

View file

@ -40,7 +40,7 @@ class SrsFormat;
class SrsSharedPtrMessage;
class SrsAmf0Object;
class SrsRtmpJitter;
class SrsTSMuxer;
class SrsTsMuxer;
class SrsRequest;
class SrsPithyPrint;
class SrsSource;
@ -53,41 +53,6 @@ class SrsHlsSegment;
class SrsTsCache;
class SrsTsContext;
/**
* write to file and cache.
*/
class SrsHlsCacheWriter : public SrsFileWriter
{
private:
SrsFileWriter impl;
std::string data;
bool should_write_cache;
bool should_write_file;
public:
SrsHlsCacheWriter(bool write_cache, bool write_file);
virtual ~SrsHlsCacheWriter();
public:
/**
* open file writer, can open then close then open...
*/
virtual int open(std::string file);
virtual void close();
public:
virtual bool is_open();
virtual int64_t tellg();
public:
/**
* write to file.
* @param pnwrite the output nb_write, NULL to ignore.
*/
virtual int write(void* buf, size_t count, ssize_t* pnwrite);
public:
/**
* get the string cache.
*/
virtual std::string cache();
};
/**
* the wrapper of m3u8 segment from specification:
*
@ -106,14 +71,14 @@ public:
// ts full file to write.
std::string full_path;
// the muxer to write ts.
SrsHlsCacheWriter* writer;
SrsTSMuxer* muxer;
SrsFileWriter* writer;
SrsTsMuxer* muxer;
// current segment start dts for m3u8
int64_t segment_start_dts;
// whether current segement is sequence header.
bool is_sequence_header;
public:
SrsHlsSegment(SrsTsContext* c, bool write_cache, bool write_file, SrsCodecAudio ac, SrsCodecVideo vc);
SrsHlsSegment(SrsTsContext* c, SrsCodecAudio ac, SrsCodecVideo vc);
virtual ~SrsHlsSegment();
public:
/**
@ -200,10 +165,6 @@ private:
int max_td;
std::string m3u8;
std::string m3u8_url;
private:
// TODO: FIXME: remove it.
bool should_write_cache;
bool should_write_file;
private:
/**
* m3u8 segments.
@ -303,34 +264,46 @@ private:
* so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter,
* we use a aac jitter to correct the audio pts.
*/
class SrsHlsCache
class SrsHlsController
{
private:
SrsTsCache* cache;
// The HLS muxer to reap ts and m3u8.
// The TS is cached to SrsTsCache then flush to ts segment.
SrsHlsMuxer* muxer;
// The TS cache
SrsTsCache* ts;
public:
SrsHlsCache();
virtual ~SrsHlsCache();
SrsHlsController();
virtual ~SrsHlsController();
public:
virtual int initialize();
virtual void dispose();
virtual int update_acodec(SrsCodecAudio ac);
virtual int sequence_no();
virtual std::string ts_url();
virtual double duration();
virtual int deviation();
public:
/**
* when publish or unpublish stream.
*/
virtual int on_publish(SrsHlsMuxer* muxer, SrsRequest* req, int64_t segment_start_dts);
virtual int on_unpublish(SrsHlsMuxer* muxer);
virtual int on_publish(SrsRequest* req, int64_t segment_start_dts);
virtual int on_unpublish();
/**
* when get sequence header,
* must write a #EXT-X-DISCONTINUITY to m3u8.
* @see: hls-m3u8-draft-pantos-http-live-streaming-12.txt
* @see: 3.4.11. EXT-X-DISCONTINUITY
*/
virtual int on_sequence_header(SrsHlsMuxer* muxer);
virtual int on_sequence_header();
/**
* write audio to cache, if need to flush, flush to muxer.
*/
virtual int write_audio(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t pts, SrsCodecSample* sample);
virtual int write_audio(SrsAudioFrame* frame, int64_t pts);
/**
* write video to muxer.
*/
virtual int write_video(SrsAvcAacCodec* codec, SrsHlsMuxer* muxer, int64_t dts, SrsCodecSample* sample);
virtual int write_video(SrsVideoFrame* frame, int64_t dts);
private:
/**
* reopen the muxer for a new hls segment,
@ -338,7 +311,7 @@ private:
* then write the key frame to the new segment.
* so, user must reap_segment then flush_video to hls muxer.
*/
virtual int reap_segment(std::string log_desc, SrsHlsMuxer* muxer, int64_t segment_start_dts);
virtual int reap_segment(std::string log_desc, int64_t segment_start_dts);
};
/**
@ -348,8 +321,7 @@ private:
class SrsHls
{
private:
SrsHlsMuxer* muxer;
SrsHlsCache* cache;
SrsHlsController* controller;
private:
SrsRequest* req;
bool enabled;
@ -400,13 +372,14 @@ public:
* mux the audio packets to ts.
* @param shared_audio, directly ptr, copy it if need to save it.
*/
virtual int on_audio(SrsSharedPtrMessage* shared_audio);
virtual int on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);
/**
* mux the video packets to ts.
* @param shared_video, directly ptr, copy it if need to save it.
* @param is_sps_pps whether the video is h.264 sps/pps.
*/
virtual int on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps);
// TODO: FIXME: Remove param is_sps_pps.
virtual int on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format);
private:
virtual void hls_show_mux_log();
};

View file

@ -653,75 +653,6 @@ bool SrsLiveEntry::is_mp3()
return _is_mp3;
}
SrsHlsM3u8Stream::SrsHlsM3u8Stream()
{
}
SrsHlsM3u8Stream::~SrsHlsM3u8Stream()
{
}
void SrsHlsM3u8Stream::set_m3u8(std::string v)
{
m3u8 = v;
}
int SrsHlsM3u8Stream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
int ret = ERROR_SUCCESS;
std::string data = m3u8;
w->header()->set_content_length((int)data.length());
w->header()->set_content_type("application/x-mpegURL;charset=utf-8");
if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send m3u8 failed. ret=%d", ret);
}
return ret;
}
return ret;
}
SrsHlsTsStream::SrsHlsTsStream()
{
}
SrsHlsTsStream::~SrsHlsTsStream()
{
}
void SrsHlsTsStream::set_ts(std::string v)
{
ts = v;
}
int SrsHlsTsStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
int ret = ERROR_SUCCESS;
std::string data = ts;
w->header()->set_content_length((int)data.length());
w->header()->set_content_type("video/MP2T");
if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send ts failed. ret=%d", ret);
}
return ret;
}
return ret;
}
SrsHlsEntry::SrsHlsEntry()
{
tmpl = NULL;
}
SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr)
{
server = svr;

View file

@ -262,58 +262,6 @@ public:
bool is_aac();
};
/**
* the m3u8 stream handler.
*/
class SrsHlsM3u8Stream : public ISrsHttpHandler
{
private:
std::string m3u8;
public:
SrsHlsM3u8Stream();
virtual ~SrsHlsM3u8Stream();
public:
virtual void set_m3u8(std::string v);
public:
virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};
/**
* the ts stream handler.
*/
class SrsHlsTsStream : public ISrsHttpHandler
{
private:
std::string ts;
public:
SrsHlsTsStream();
virtual ~SrsHlsTsStream();
public:
virtual void set_ts(std::string v);
public:
virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};
/**
* the srs hls entry.
*/
// TODO: FIXME: use hte hls template and entry.
struct SrsHlsEntry
{
// for template, the mount contains variables.
// for concrete stream, the mount is url to access.
std::string mount;
// the template to create the entry
SrsHlsEntry* tmpl;
// key: the m3u8/ts file path.
// value: the http handler.
std::map<std::string, ISrsHttpHandler*> streams;
SrsHlsEntry();
};
/**
* the http stream server instance,
* serve http stream, for example, flv/ts/mp3/aac live stream.

View file

@ -44,6 +44,7 @@ using namespace std;
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_format.hpp>
#ifdef SRS_AUTO_STREAM_CASTER
@ -135,13 +136,13 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
SrsRtspAudioCache::SrsRtspAudioCache()
{
dts = 0;
audio_samples = NULL;
audio = NULL;
payload = NULL;
}
SrsRtspAudioCache::~SrsRtspAudioCache()
{
srs_freep(audio_samples);
srs_freep(audio);
srs_freep(payload);
}
@ -456,10 +457,10 @@ int SrsRtspConn::on_rtp_audio(SrsRtpPacket* pkt, int64_t dts)
// cache current audio to kickoff.
acache->dts = dts;
acache->audio_samples = pkt->audio_samples;
acache->audio = pkt->audio;
acache->payload = pkt->payload;
pkt->audio_samples = NULL;
pkt->audio = NULL;
pkt->payload = NULL;
return ret;
@ -474,11 +475,11 @@ int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts)
return ret;
}
if (dts - acache->dts > 0 && acache->audio_samples->nb_sample_units > 0) {
int64_t delta = (dts - acache->dts) / acache->audio_samples->nb_sample_units;
for (int i = 0; i < acache->audio_samples->nb_sample_units; i++) {
char* frame = acache->audio_samples->sample_units[i].bytes;
int nb_frame = acache->audio_samples->sample_units[i].size;
if (dts - acache->dts > 0 && acache->audio->nb_samples > 0) {
int64_t delta = (dts - acache->dts) / acache->audio->nb_samples;
for (int i = 0; i < acache->audio->nb_samples; i++) {
char* frame = acache->audio->samples[i].bytes;
int nb_frame = acache->audio->samples[i].size;
int64_t timestamp = (acache->dts + delta * i) / 90;
acodec->aac_packet_type = 1;
if ((ret = write_audio_raw_frame(frame, nb_frame, acodec, (uint32_t)timestamp)) != ERROR_SUCCESS) {
@ -488,7 +489,7 @@ int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts)
}
acache->dts = 0;
srs_freep(acache->audio_samples);
srs_freep(acache->audio);
srs_freep(acache->payload);
return ret;
@ -510,13 +511,17 @@ int SrsRtspConn::write_sequence_header()
if (true) {
std::string sh = aac_specific_config;
SrsAvcAacCodec dec;
if ((ret = dec.audio_aac_sequence_header_demux((char*)sh.c_str(), (int)sh.length())) != ERROR_SUCCESS) {
SrsFormat* format = new SrsFormat();
SrsAutoFree(SrsFormat, format);
if ((ret = format->on_aac_sequence_header((char*)sh.c_str(), (int)sh.length())) != ERROR_SUCCESS) {
return ret;
}
SrsAudioCodec* dec = format->acodec;
acodec->sound_format = SrsCodecAudioAAC;
acodec->sound_type = (dec.aac_channels == 2)? SrsCodecAudioSoundTypeStereo : SrsCodecAudioSoundTypeMono;
acodec->sound_type = (dec->aac_channels == 2)? SrsCodecAudioSoundTypeStereo : SrsCodecAudioSoundTypeMono;
acodec->sound_size = SrsCodecAudioSampleSize16bit;
acodec->aac_packet_type = 0;
@ -526,7 +531,7 @@ int SrsRtspConn::write_sequence_header()
16000, 12000, 11025, 8000,
7350, 0, 0, 0
};
switch (aac_sample_rates[dec.aac_sample_rate]) {
switch (aac_sample_rates[dec->aac_sample_rate]) {
case 11025:
acodec->sound_rate = SrsCodecAudioSampleRate11025;
break;

View file

@ -53,7 +53,7 @@ class SrsRawH264Stream;
class SrsRawAacStream;
struct SrsRawAacStreamCodec;
class SrsSharedPtrMessage;
class SrsCodecSample;
class SrsAudioFrame;
class SrsSimpleStream;
class SrsPithyPrint;
class SrsSimpleRtmpClient;
@ -87,7 +87,7 @@ public:
struct SrsRtspAudioCache
{
int64_t dts;
SrsCodecSample* audio_samples;
SrsAudioFrame* audio;
SrsSimpleStream* payload;
SrsRtspAudioCache();

View file

@ -854,7 +854,7 @@ SrsOriginHub::SrsOriginHub()
hds = new SrsHds();
#endif
ng_exec = new SrsNgExec();
format = new SrsFormat();
format = new SrsRtmpFormat();
_srs_config->subscribe(this);
}
@ -971,7 +971,29 @@ int SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
return ret;
}
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
// cache the sequence header if aac
// donot cache the sequence header to gop_cache, return here.
if (format->is_aac_sequence_header()) {
srs_assert(format->acodec);
SrsAudioCodec* c = format->acodec;
static int flv_sample_sizes[] = {8, 16, 0};
static int flv_sound_types[] = {1, 2, 0};
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_audio_info(req, SrsCodecAudioAAC, c->sound_rate, c->sound_type, c->aac_object)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), flv(%dbits, %dchannels, %dHZ)",
msg->size, c->id, srs_codec_aac_object2str(c->aac_object).c_str(), c->aac_channels,
c->audio_data_rate / 1000, aac_sample_rates[c->aac_sample_rate],
flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
flv_sample_rates[c->sound_rate]);
}
if ((ret = hls->on_audio(msg, format)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
@ -1044,12 +1066,36 @@ int SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_h
SrsSharedPtrMessage* msg = shared_video;
if ((ret = format->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
if (is_sequence_header) {
format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
}
if ((ret = format->on_video(msg)) != ERROR_SUCCESS) {
srs_error("Codec parse video failed, ret=%d", ret);
return ret;
}
if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
if (format->is_avc_sequence_header()) {
SrsVideoCodec* c = format->vcodec;
srs_assert(c);
// when got video stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_info(req, SrsCodecVideoAVC, c->avc_profile, c->avc_level, c->width, c->height)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)",
msg->size, c->id, srs_codec_avc_profile2str(c->avc_profile).c_str(),
srs_codec_avc_level2str(c->avc_level).c_str(), c->width, c->height,
c->video_data_rate / 1000, c->frame_rate, c->duration);
}
if ((ret = hls->on_video(msg, format)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
@ -1341,11 +1387,11 @@ int SrsOriginHub::on_reload_vhost_hls(string vhost)
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
// TODO: maybe need to decode the metadata?
if (cache_sh_video && (ret = hls->on_video(cache_sh_video, true)) != ERROR_SUCCESS) {
if (cache_sh_video && (ret = hls->on_video(cache_sh_video, format)) != ERROR_SUCCESS) {
srs_error("hls process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio, format)) != ERROR_SUCCESS) {
srs_error("hls process audio sequence header message failed. ret=%d", ret);
return ret;
}
@ -2137,35 +2183,6 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
}
}
// cache the sequence header if aac
// donot cache the sequence header to gop_cache, return here.
if (is_aac_sequence_header) {
// parse detail audio codec
SrsAvcAacCodec codec;
SrsCodecSample sample;
if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
srs_error("source codec demux audio failed. ret=%d", ret);
return ret;
}
static int flv_sample_sizes[] = {8, 16, 0};
static int flv_sound_types[] = {1, 2, 0};
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_audio_info(req, SrsCodecAudioAAC, sample.sound_rate, sample.sound_type, codec.aac_object)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), "
"flv(%dbits, %dchannels, %dHZ)",
msg->size, codec.audio_codec_id,
srs_codec_aac_object2str(codec.aac_object).c_str(), codec.aac_channels,
codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
flv_sample_rates[sample.sound_rate]);
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
@ -2296,31 +2313,6 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
// donot cache the sequence header to gop_cache, return here.
if (is_sequence_header) {
meta->update_vsh(msg);
// parse detail audio codec
SrsAvcAacCodec codec;
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
codec.avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
SrsCodecSample sample;
if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
srs_error("source codec demux video failed. ret=%d", ret);
return ret;
}
// when got video stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_info(req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level, codec.width, codec.height)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",
msg->size, codec.video_codec_id,
srs_codec_avc_profile2str(codec.avc_profile).c_str(),
srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,
codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
}
// Copy to hub to all utilities.

View file

@ -38,7 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_reload.hpp>
#include <srs_core_performance.hpp>
class SrsFormat;
class SrsRtmpFormat;
class SrsConsumer;
class SrsPlayEdge;
class SrsPublishEdge;
@ -423,7 +423,7 @@ private:
bool is_active;
private:
// The format, codec information.
SrsFormat* format;
SrsRtmpFormat* format;
// hls handler.
SrsHls* hls;
// The DASH encoder.