1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #299, extract fragment and fragment window for hls

This commit is contained in:
winlin 2017-03-18 21:29:08 +08:00
parent 6da6e0511d
commit da4c390d69
9 changed files with 450 additions and 192 deletions

View file

@ -54,8 +54,6 @@ using namespace std;
// drop the segment when duration of ts too small.
#define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100
// when hls timestamp jump, reset it.
#define SRS_AUTO_HLS_SEGMENT_TIMESTAMP_JUMP_MS 300
// fragment plus the deviation percent.
#define SRS_HLS_FLOOR_REAP_PERCENT 0.3
@ -64,10 +62,7 @@ using namespace std;
SrsHlsSegment::SrsHlsSegment(SrsTsContext* c, SrsAudioCodecId ac, SrsVideoCodecId vc)
{
duration = 0;
sequence_no = 0;
segment_start_dts = 0;
is_sequence_header = false;
writer = new SrsFileWriter();
tscw = new SrsTsContextWriter(writer, c, ac, vc);
}
@ -78,27 +73,6 @@ SrsHlsSegment::~SrsHlsSegment()
srs_freep(writer);
}
void SrsHlsSegment::update_duration(int64_t current_frame_dts)
{
// we use video/audio to update segment duration,
// so when reap segment, some previous audio frame will
// update the segment duration, which is nagetive,
// just ignore it.
if (current_frame_dts < segment_start_dts) {
// for atc and timestamp jump, reset the start dts.
if (current_frame_dts < segment_start_dts - SRS_AUTO_HLS_SEGMENT_TIMESTAMP_JUMP_MS * 90) {
srs_warn("hls timestamp jump %"PRId64"=>%"PRId64, segment_start_dts, current_frame_dts);
segment_start_dts = current_frame_dts;
}
return;
}
duration = (current_frame_dts - segment_start_dts) / 90000.0;
srs_assert(duration >= 0);
return;
}
SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(int c, SrsRequest* r, string p, string t, string m, string mu, int s, double d)
{
req = r->copy();
@ -225,17 +199,11 @@ SrsHlsMuxer::SrsHlsMuxer()
current = NULL;
async = new SrsAsyncCallWorker();
context = new SrsTsContext();
segments = new SrsFragmentWindow();
}
SrsHlsMuxer::~SrsHlsMuxer()
{
std::vector<SrsHlsSegment*>::iterator it;
for (it = segments.begin(); it != segments.end(); ++it) {
SrsHlsSegment* segment = *it;
srs_freep(segment);
}
segments.clear();
srs_freep(current);
srs_freep(req);
srs_freep(async);
@ -244,20 +212,13 @@ SrsHlsMuxer::~SrsHlsMuxer()
void SrsHlsMuxer::dispose()
{
std::vector<SrsHlsSegment*>::iterator it;
for (it = segments.begin(); it != segments.end(); ++it) {
SrsHlsSegment* segment = *it;
if (unlink(segment->full_path.c_str()) < 0) {
srs_warn("dispose unlink path failed, file=%s.", segment->full_path.c_str());
}
srs_freep(segment);
}
segments.clear();
int ret = ERROR_SUCCESS;
segments->dispose();
if (current) {
std::string path = current->full_path + ".tmp";
if (unlink(path.c_str()) < 0) {
srs_warn("dispose unlink path failed, file=%s", path.c_str());
if ((ret = current->unlink_tmpfile()) != ERROR_SUCCESS) {
srs_warn("Unlink tmp ts failed, ret=%d", ret);
}
srs_freep(current);
}
@ -266,8 +227,6 @@ void SrsHlsMuxer::dispose()
srs_warn("dispose unlink path failed. file=%s", m3u8.c_str());
}
// TODO: FIXME: support hls dispose in HTTP cache.
srs_trace("gracefully dispose hls %s", req? req->get_stream_url().c_str() : "");
}
@ -283,7 +242,7 @@ string SrsHlsMuxer::ts_url()
double SrsHlsMuxer::duration()
{
return current? current->duration:0;
return current? current->duration()/1000.0:0;
}
int SrsHlsMuxer::deviation()
@ -347,7 +306,7 @@ int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
return ret;
}
int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
int SrsHlsMuxer::segment_open()
{
int ret = ERROR_SUCCESS;
@ -395,7 +354,6 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
// new segment.
current = new SrsHlsSegment(context, default_acodec, default_vcodec);
current->sequence_no = _sequence_no++;
current->segment_start_dts = segment_start_dts;
// generate filename.
std::string ts_file = hls_ts_file;
@ -440,11 +398,12 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
ss << current->sequence_no;
ts_file = srs_string_replace(ts_file, "[seq]", ss.str());
}
current->full_path = hls_path + "/" + ts_file;
current->set_path(hls_path + "/" + ts_file);
srs_info("hls: generate ts path %s, tmpl=%s, floor=%d", ts_file.c_str(), hls_ts_file.c_str(), hls_ts_floor);
// the ts url, relative or absolute url.
std::string ts_url = current->full_path;
// TODO: FIXME: Use url and path manager.
std::string ts_url = current->fullpath();
if (srs_string_starts_with(ts_url, m3u8_dir)) {
ts_url = ts_url.substr(m3u8_dir.length());
}
@ -464,7 +423,7 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
current->uri += ts_url;
// create dir recursively for hls.
std::string ts_dir = srs_path_dirname(current->full_path);
std::string ts_dir = srs_path_dirname(current->fullpath());
if ((ret = srs_create_dir_recursively(ts_dir)) != ERROR_SUCCESS) {
srs_error("create app dir %s failed. ret=%d", ts_dir.c_str(), ret);
return ret;
@ -472,7 +431,7 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
srs_info("create ts dir %s ok", ts_dir.c_str());
// open temp ts file.
std::string tmp_file = current->full_path + ".tmp";
std::string tmp_file = current->tmppath();
if ((ret = current->tscw->open(tmp_file.c_str())) != ERROR_SUCCESS) {
srs_error("open hls muxer failed. ret=%d", ret);
return ret;
@ -490,7 +449,7 @@ int SrsHlsMuxer::on_sequence_header()
// set the current segment to sequence header,
// when close the segement, it will write a discontinuity to m3u8 file.
current->is_sequence_header = true;
current->set_sequence_header(true);
return ret;
}
@ -500,7 +459,7 @@ bool SrsHlsMuxer::is_segment_overflow()
srs_assert(current);
// to prevent very small segment.
if (current->duration * 1000 < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
if (current->duration() < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
return false;
}
@ -509,7 +468,7 @@ bool SrsHlsMuxer::is_segment_overflow()
srs_info("hls: dur=%.2f, tar=%.2f, dev=%.2fms/%dp, frag=%.2f",
current->duration, hls_fragment + deviation, deviation, deviation_ts, hls_fragment);
return current->duration >= hls_fragment + deviation;
return current->duration() >= (hls_fragment + deviation) * 1000;
}
bool SrsHlsMuxer::wait_keyframe()
@ -523,7 +482,7 @@ bool SrsHlsMuxer::is_segment_absolutely_overflow()
srs_assert(current);
// to prevent very small segment.
if (current->duration * 1000 < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
if (current->duration() < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
return false;
}
@ -532,7 +491,7 @@ bool SrsHlsMuxer::is_segment_absolutely_overflow()
srs_info("hls: dur=%.2f, tar=%.2f, dev=%.2fms/%dp, frag=%.2f",
current->duration, hls_fragment + deviation, deviation, deviation_ts, hls_fragment);
return current->duration >= hls_aof_ratio * hls_fragment + deviation;
return current->duration() >= (hls_aof_ratio * hls_fragment + deviation) * 1000;
}
bool SrsHlsMuxer::pure_audio()
@ -555,7 +514,7 @@ int SrsHlsMuxer::flush_audio(SrsTsMessageCache* cache)
}
// update the duration of segment.
current->update_duration(cache->audio->pts);
current->append(cache->audio->pts / 90);
if ((ret = current->tscw->write_audio(cache->audio)) != ERROR_SUCCESS) {
return ret;
@ -584,7 +543,7 @@ int SrsHlsMuxer::flush_video(SrsTsMessageCache* cache)
srs_assert(current);
// update the duration of segment.
current->update_duration(cache->video->dts);
current->append(cache->video->dts / 90);
if ((ret = current->tscw->write_video(cache->video)) != ERROR_SUCCESS) {
return ret;
@ -596,7 +555,7 @@ int SrsHlsMuxer::flush_video(SrsTsMessageCache* cache)
return ret;
}
int SrsHlsMuxer::segment_close(string log_desc)
int SrsHlsMuxer::segment_close()
{
int ret = ERROR_SUCCESS;
@ -608,23 +567,16 @@ int SrsHlsMuxer::segment_close(string log_desc)
// when close current segment, the current segment must not be NULL.
srs_assert(current);
// assert segment duplicate.
std::vector<SrsHlsSegment*>::iterator it;
it = std::find(segments.begin(), segments.end(), current);
srs_assert(it == segments.end());
// valid, add to segments if segment duration is ok
// when too small, it maybe not enough data to play.
// when too large, it maybe timestamp corrupt.
// make the segment more acceptable, when in [min, max_td * 2], it's ok.
if (current->duration * 1000 >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS && (int)current->duration <= max_td * 2) {
segments.push_back(current);
if (current->duration() >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS && (int)current->duration() <= max_td * 2 * 1000) {
// use async to call the http hooks, for it will cause thread switch.
if ((ret = async->execute(new SrsDvrAsyncCallOnHls(
_srs_context->get_id(), req,
current->full_path, current->uri, m3u8, m3u8_url,
current->sequence_no, current->duration))) != ERROR_SUCCESS)
current->fullpath(), current->uri, m3u8, m3u8_url,
current->sequence_no, current->duration() / 1000.0))) != ERROR_SUCCESS)
{
return ret;
}
@ -640,69 +592,35 @@ int SrsHlsMuxer::segment_close(string log_desc)
// close the muxer of finished segment.
srs_freep(current->tscw);
std::string full_path = current->full_path;
current = NULL;
// rename from tmp to real path
std::string tmp_file = full_path + ".tmp";
if (rename(tmp_file.c_str(), full_path.c_str()) < 0) {
ret = ERROR_HLS_WRITE_FAILED;
srs_error("rename ts file failed, %s => %s. ret=%d",
tmp_file.c_str(), full_path.c_str(), ret);
if ((ret = current->rename()) != ERROR_SUCCESS) {
return ret;
}
segments->append(current);
current = NULL;
} else {
// reuse current segment index.
_sequence_no--;
srs_trace("%s drop ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64"",
log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration,
current->segment_start_dts);
srs_trace("Drop ts segment, sequence_no=%d, uri=%s, duration=%dms", current->sequence_no, current->uri.c_str(), current->duration());
// rename from tmp to real path
std::string tmp_file = current->full_path + ".tmp";
if (unlink(tmp_file.c_str()) < 0) {
srs_warn("ignore unlink path failed, file=%s.", tmp_file.c_str());
if ((ret = current->unlink_tmpfile()) != ERROR_SUCCESS) {
return ret;
}
srs_freep(current);
}
// the segments to remove
std::vector<SrsHlsSegment*> segment_to_remove;
// shrink the segments.
double duration = 0;
int remove_index = -1;
for (int i = (int)segments.size() - 1; i >= 0; i--) {
SrsHlsSegment* segment = segments[i];
duration += segment->duration;
if ((int)duration > hls_window) {
remove_index = i;
break;
}
}
for (int i = 0; i < remove_index && !segments.empty(); i++) {
SrsHlsSegment* segment = *segments.begin();
segments.erase(segments.begin());
segment_to_remove.push_back(segment);
}
segments->shrink(hls_window * 1000);
// refresh the m3u8, donot contains the removed ts
ret = refresh_m3u8();
// remove the ts file.
for (int i = 0; i < (int)segment_to_remove.size(); i++) {
SrsHlsSegment* segment = segment_to_remove[i];
if (hls_cleanup && unlink(segment->full_path.c_str()) < 0) {
srs_warn("cleanup unlink path failed, file=%s.", segment->full_path.c_str());
}
srs_freep(segment);
}
segment_to_remove.clear();
segments->clear_expired(hls_cleanup);
// check ret of refresh m3u8
if (ret != ERROR_SUCCESS) {
@ -718,7 +636,7 @@ int SrsHlsMuxer::refresh_m3u8()
int ret = ERROR_SUCCESS;
// no segments, also no m3u8, return.
if (segments.size() == 0) {
if (segments->empty()) {
return ret;
}
@ -745,7 +663,7 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
int ret = ERROR_SUCCESS;
// no segments, return.
if (segments.size() == 0) {
if (segments->empty()) {
return ret;
}
@ -766,7 +684,7 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
srs_verbose("write m3u8 header success.");
// #EXT-X-MEDIA-SEQUENCE:4294967295\n
SrsHlsSegment* first = *segments.begin();
SrsHlsSegment* first = dynamic_cast<SrsHlsSegment*>(segments->first());
ss << "#EXT-X-MEDIA-SEQUENCE:" << first->sequence_no << SRS_CONSTS_LF;
srs_verbose("write m3u8 sequence success.");
@ -783,21 +701,17 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
* typical target duration is 10 seconds.
*/
// @see https://github.com/ossrs/srs/issues/304#issuecomment-74000081
int target_duration = 0;
for (it = segments.begin(); it != segments.end(); ++it) {
SrsHlsSegment* segment = *it;
target_duration = srs_max(target_duration, (int)ceil(segment->duration));
}
int target_duration = (int)ceil(segments->max_duration() / 1000.0);
target_duration = srs_max(target_duration, max_td);
ss << "#EXT-X-TARGETDURATION:" << target_duration << SRS_CONSTS_LF;
srs_verbose("write m3u8 duration success.");
// write all segments
for (it = segments.begin(); it != segments.end(); ++it) {
SrsHlsSegment* segment = *it;
for (int i = 0; i < segments->size(); i++) {
SrsHlsSegment* segment = dynamic_cast<SrsHlsSegment*>(segments->at(i));
if (segment->is_sequence_header) {
if (segment->is_sequence_header()) {
// #EXT-X-DISCONTINUITY\n
ss << "#EXT-X-DISCONTINUITY" << SRS_CONSTS_LF;
srs_verbose("write m3u8 segment discontinuity success.");
@ -806,7 +720,7 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
// "#EXTINF:4294967295.208,\n"
ss.precision(3);
ss.setf(std::ios::fixed, std::ios::floatfield);
ss << "#EXTINF:" << segment->duration << ", no desc" << SRS_CONSTS_LF;
ss << "#EXTINF:" << segment->duration() / 1000.0 << ", no desc" << SRS_CONSTS_LF;
srs_verbose("write m3u8 segment info success.");
// {file name}\n
@ -867,7 +781,7 @@ int SrsHlsController::deviation()
return muxer->deviation();
}
int SrsHlsController::on_publish(SrsRequest* req, int64_t segment_start_dts)
int SrsHlsController::on_publish(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
@ -905,7 +819,7 @@ int SrsHlsController::on_publish(SrsRequest* req, int64_t segment_start_dts)
return ret;
}
if ((ret = muxer->segment_open(segment_start_dts)) != ERROR_SUCCESS) {
if ((ret = muxer->segment_open()) != ERROR_SUCCESS) {
srs_error("m3u8 muxer open segment failed. ret=%d", ret);
return ret;
}
@ -925,7 +839,7 @@ int SrsHlsController::on_unpublish()
return ret;
}
if ((ret = muxer->segment_close("unpublish")) != ERROR_SUCCESS) {
if ((ret = muxer->segment_close()) != ERROR_SUCCESS) {
return ret;
}
@ -962,7 +876,7 @@ int SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts)
// @see https://github.com/ossrs/srs/issues/151#issuecomment-71155184
if (tsmc->audio && muxer->is_segment_absolutely_overflow()) {
srs_info("hls: absolute audio reap segment.");
if ((ret = reap_segment("audio", tsmc->audio->pts)) != ERROR_SUCCESS) {
if ((ret = reap_segment()) != ERROR_SUCCESS) {
return ret;
}
}
@ -1001,7 +915,7 @@ int SrsHlsController::write_video(SrsVideoFrame* frame, int64_t dts)
// b. always reap when not wait keyframe.
if (!muxer->wait_keyframe() || frame->frame_type == SrsVideoAvcFrameTypeKeyFrame) {
// reap the segment, which will also flush the video.
if ((ret = reap_segment("video", tsmc->video->dts)) != ERROR_SUCCESS) {
if ((ret = reap_segment()) != ERROR_SUCCESS) {
return ret;
}
}
@ -1016,7 +930,7 @@ int SrsHlsController::write_video(SrsVideoFrame* frame, int64_t dts)
return ret;
}
int SrsHlsController::reap_segment(string log_desc, int64_t segment_start_dts)
int SrsHlsController::reap_segment()
{
int ret = ERROR_SUCCESS;
@ -1024,13 +938,13 @@ int SrsHlsController::reap_segment(string log_desc, int64_t segment_start_dts)
// TODO: fresh segment begin with audio or video?
// close current ts.
if ((ret = muxer->segment_close(log_desc)) != ERROR_SUCCESS) {
if ((ret = muxer->segment_close()) != ERROR_SUCCESS) {
srs_error("m3u8 muxer close segment failed. ret=%d", ret);
return ret;
}
// open new ts.
if ((ret = muxer->segment_open(segment_start_dts)) != ERROR_SUCCESS) {
if ((ret = muxer->segment_open()) != ERROR_SUCCESS) {
srs_error("m3u8 muxer open segment failed. ret=%d", ret);
return ret;
}
@ -1065,7 +979,6 @@ SrsHls::SrsHls()
controller = new SrsHlsController();
pprint = SrsPithyPrint::create_hls();
stream_dts = 0;
}
SrsHls::~SrsHls()
@ -1148,7 +1061,7 @@ int SrsHls::on_publish()
return ret;
}
if ((ret = controller->on_publish(req, stream_dts)) != ERROR_SUCCESS) {
if ((ret = controller->on_publish(req)) != ERROR_SUCCESS) {
return ret;
}
@ -1213,9 +1126,6 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
// the dts calc from rtmp/flv header.
int64_t dts = audio->timestamp * 90;
// for pure audio, we need to update the stream dts also.
stream_dts = dts;
if ((ret = controller->write_audio(format->audio, dts)) != ERROR_SUCCESS) {
srs_error("hls cache write audio failed. ret=%d", ret);
return ret;
@ -1262,7 +1172,6 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format)
}
int64_t dts = video->timestamp * 90;
stream_dts = dts;
if ((ret = controller->write_video(format->video, dts)) != ERROR_SUCCESS) {
srs_error("hls cache write video failed. ret=%d", ret);
return ret;
@ -1285,8 +1194,8 @@ void SrsHls::hls_show_mux_log()
// the run time is not equals to stream time,
// @see: https://github.com/ossrs/srs/issues/81#issuecomment-48100994
// it's ok.
srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sno=%d, ts=%s, dur=%.2f, dva=%dp",
pprint->age(), stream_dts, stream_dts / 90, controller->sequence_no(), controller->ts_url().c_str(),
srs_trace("-> "SRS_CONSTS_LOG_HLS" time=%"PRId64", sno=%d, ts=%s, dur=%.2f, dva=%dp",
pprint->age(), controller->sequence_no(), controller->ts_url().c_str(),
controller->duration(), controller->deviation());
}