1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #738, refine the dvr segmenter.

This commit is contained in:
winlin 2017-02-06 18:33:26 +08:00
parent 31191f2650
commit 8c01f52372
9 changed files with 477 additions and 293 deletions

View file

@ -42,59 +42,57 @@ using namespace std;
#include <srs_kernel_buffer.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_mp4.hpp>
// update the flv duration and filesize every this interval in ms.
#define SRS_DVR_UPDATE_DURATION_INTERVAL 60000
SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p)
SrsDvrSegmenter::SrsDvrSegmenter()
{
req = NULL;
jitter = NULL;
plan = p;
fs = new SrsFileWriter();
enc = new SrsFlvEncoder();
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
path = "";
has_keyframe = false;
plan = NULL;
duration = 0;
starttime = -1;
stream_starttime = 0;
stream_previous_pkt_time = -1;
stream_duration = 0;
duration_offset = 0;
filesize_offset = 0;
path = "";
fs = new SrsFileWriter();
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
_srs_config->subscribe(this);
}
SrsFlvSegment::~SrsFlvSegment()
SrsDvrSegmenter::~SrsDvrSegmenter()
{
_srs_config->unsubscribe(this);
srs_freep(jitter);
srs_freep(fs);
srs_freep(enc);
}
int SrsFlvSegment::initialize(SrsRequest* r)
int SrsDvrSegmenter::initialize(SrsDvrPlan* p, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
req = r;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
plan = p;
int jitter = _srs_config->get_dvr_time_jitter(req->vhost);
jitter_algorithm = (SrsRtmpJitterAlgorithm)jitter;
return ret;
}
bool SrsFlvSegment::is_overflow(int64_t max_duration)
string SrsDvrSegmenter::get_path()
{
return path;
}
bool SrsDvrSegmenter::is_overflow(int64_t max_duration)
{
return duration >= max_duration;
}
int SrsFlvSegment::open(bool use_tmp_file)
int SrsDvrSegmenter::open(bool use_tmp_file)
{
int ret = ERROR_SUCCESS;
@ -104,7 +102,14 @@ int SrsFlvSegment::open(bool use_tmp_file)
}
path = generate_path();
bool fresh_flv_file = !srs_path_exists(path);
bool can_append = srs_string_ends_with(path, ".flv");
bool target_exists = srs_path_exists(path);
if (!can_append && target_exists) {
ret = ERROR_DVR_CANNOT_APPEND;
srs_error("DVR can't append to exists path=%s. ret=%d", path.c_str(), ret);
return ret;
}
// create dir first.
std::string dir = srs_path_dirname(path);
@ -115,60 +120,46 @@ int SrsFlvSegment::open(bool use_tmp_file)
srs_info("create dir=%s ok", dir.c_str());
// create jitter.
if ((ret = create_jitter(!fresh_flv_file)) != ERROR_SUCCESS) {
srs_error("create jitter failed, path=%s, fresh=%d. ret=%d", path.c_str(), fresh_flv_file, ret);
if ((ret = create_jitter(target_exists)) != ERROR_SUCCESS) {
srs_error("create jitter failed, path=%s, exists=%d. ret=%d", path.c_str(), target_exists, ret);
return ret;
}
// generate the tmp flv path.
if (!fresh_flv_file || !use_tmp_file) {
if (target_exists || !use_tmp_file) {
// when path exists, always append to it.
// so we must use the target flv path as output flv.
tmp_flv_file = path;
tmp_dvr_file = path;
} else {
// when path not exists, dvr to tmp file.
tmp_flv_file = path + ".tmp";
tmp_dvr_file = path + ".tmp";
}
// open file writer, in append or create mode.
if (!fresh_flv_file) {
if ((ret = fs->open_append(tmp_flv_file)) != ERROR_SUCCESS) {
if (target_exists) {
if ((ret = fs->open_append(tmp_dvr_file)) != ERROR_SUCCESS) {
srs_error("append file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
srs_trace("dvr: always append to when exists, file=%s.", path.c_str());
} else {
if ((ret = fs->open(tmp_flv_file)) != ERROR_SUCCESS) {
if ((ret = fs->open(tmp_dvr_file)) != ERROR_SUCCESS) {
srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
}
// initialize the encoder.
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
if ((ret = open_encoder()) != ERROR_SUCCESS) {
return ret;
}
// when exists, donot write flv header.
if (fresh_flv_file) {
// write the flv header to writer.
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header failed. ret=%d", ret);
return ret;
}
}
// update the duration and filesize offset.
duration_offset = 0;
filesize_offset = 0;
srs_trace("dvr stream %s to file %s", req->stream.c_str(), path.c_str());
return ret;
}
int SrsFlvSegment::close()
int SrsDvrSegmenter::close()
{
int ret = ERROR_SUCCESS;
@ -177,19 +168,19 @@ int SrsFlvSegment::close()
return ret;
}
// update duration and filesize.
if ((ret = update_flv_metadata()) != ERROR_SUCCESS) {
// Close the encoder, then close the fs object.
if ((ret = close_encoder()) != ERROR_SUCCESS) {
return ret;
}
fs->close();
// when tmp flv file exists, reap it.
if (tmp_flv_file != path) {
if (rename(tmp_flv_file.c_str(), path.c_str()) < 0) {
if (tmp_dvr_file != path) {
if (rename(tmp_dvr_file.c_str(), path.c_str()) < 0) {
ret = ERROR_SYSTEM_FILE_RENAME;
srs_error("rename flv file failed, %s => %s. ret=%d",
tmp_flv_file.c_str(), path.c_str(), ret);
tmp_dvr_file.c_str(), path.c_str(), ret);
return ret;
}
}
@ -204,35 +195,179 @@ int SrsFlvSegment::close()
return ret;
}
int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata)
string SrsDvrSegmenter::generate_path()
{
// the path in config, for example,
// /data/[vhost]/[app]/[stream]/[2006]/[01]/[02]/[15].[04].[05].[999].flv
std::string path_config = _srs_config->get_dvr_path(req->vhost);
// add [stream].[timestamp].flv as filename for dir
if (!srs_string_ends_with(path_config, ".flv")) {
path_config += "/[stream].[timestamp].flv";
}
// the flv file path
std::string flv_path = path_config;
flv_path = srs_path_build_stream(flv_path, req->vhost, req->app, req->stream);
flv_path = srs_path_build_timestamp(flv_path);
return flv_path;
}
int SrsDvrSegmenter::create_jitter(bool target_exists)
{
int ret = ERROR_SUCCESS;
// When DVR target file not exists, create new jitter.
if (!target_exists) {
// jitter when publish, ensure whole stream start from 0.
srs_freep(jitter);
jitter = new SrsRtmpJitter();
duration = 0;
if (duration_offset || filesize_offset) {
return ret;
}
// when jitter ok, do nothing.
if (jitter) {
return ret;
}
// always ensure the jitter crote.
// for the first time, initialize jitter from exists file.
jitter = new SrsRtmpJitter();
// TODO: FIXME: implements it.
return ret;
}
int SrsDvrSegmenter::on_reload_vhost_dvr(std::string /*vhost*/)
{
int ret = ERROR_SUCCESS;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
return ret;
}
SrsDvrFlvSegmenter::SrsDvrFlvSegmenter()
{
enc = new SrsFlvEncoder();
duration_offset = 0;
filesize_offset = 0;
has_keyframe = false;
starttime = -1;
stream_starttime = 0;
stream_previous_pkt_time = -1;
stream_duration = 0;
}
SrsDvrFlvSegmenter::~SrsDvrFlvSegmenter()
{
srs_freep(enc);
}
int SrsDvrFlvSegmenter::open_encoder()
{
int ret = ERROR_SUCCESS;
if ((ret = enc->initialize(fs)) != ERROR_SUCCESS) {
srs_error("initialize enc by fs for file %s failed. ret=%d", path.c_str(), ret);
return ret;
}
bool target_exists = srs_path_exists(path);
if (target_exists){
has_keyframe = false;
// fresh stream starting.
starttime = -1;
stream_previous_pkt_time = -1;
stream_starttime = srs_update_system_time_ms();
stream_duration = 0;
// write the flv header to writer.
if ((ret = enc->write_header()) != ERROR_SUCCESS) {
srs_error("write flv header failed. ret=%d", ret);
return ret;
}
}
// update the duration and filesize offset.
duration_offset = 0;
filesize_offset = 0;
return ret;
}
int SrsDvrFlvSegmenter::close_encoder()
{
return refresh_metadata();
}
int SrsDvrFlvSegmenter::on_update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
// we must assumpt that the stream timestamp is monotonically increase,
// that is, always use time jitter to correct the timestamp.
// except the time jitter is disabled in config.
// set the segment starttime at first time
if (starttime < 0) {
starttime = msg->timestamp;
}
// no previous packet or timestamp overflow.
if (stream_previous_pkt_time < 0 || stream_previous_pkt_time > msg->timestamp) {
stream_previous_pkt_time = msg->timestamp;
}
// collect segment and stream duration, timestamp overflow is ok.
duration += msg->timestamp - stream_previous_pkt_time;
stream_duration += msg->timestamp - stream_previous_pkt_time;
// update previous packet time
stream_previous_pkt_time = msg->timestamp;
return ret;
}
int SrsDvrFlvSegmenter::write_metadata(SrsSharedPtrMessage* metadata)
{
int ret = ERROR_SUCCESS;
// Ignore when metadata already written.
if (duration_offset || filesize_offset) {
return ret;
}
SrsBuffer stream;
if ((ret = stream.initialize(metadata->payload, metadata->size)) != ERROR_SUCCESS) {
return ret;
}
SrsAmf0Any* name = SrsAmf0Any::str();
SrsAutoFree(SrsAmf0Any, name);
if ((ret = name->read(&stream)) != ERROR_SUCCESS) {
return ret;
}
SrsAmf0Object* obj = SrsAmf0Any::object();
SrsAutoFree(SrsAmf0Object, obj);
if ((ret = obj->read(&stream)) != ERROR_SUCCESS) {
return ret;
}
// remove duration and filesize.
obj->set("filesize", NULL);
obj->set("duration", NULL);
// add properties.
obj->set("service", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
obj->set("filesize", SrsAmf0Any::number(0));
@ -241,12 +376,12 @@ int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata)
int size = name->total_size() + obj->total_size();
char* payload = new char[size];
SrsAutoFreeA(char, payload);
// 11B flv header, 3B object EOF, 8B number value, 1B number flag.
duration_offset = fs->tellg() + size + 11 - SrsAmf0Size::object_eof() - SrsAmf0Size::number();
// 2B string flag, 8B number value, 8B string 'duration', 1B number flag
filesize_offset = duration_offset - SrsAmf0Size::utf8("duration") - SrsAmf0Size::number();
// convert metadata to bytes.
if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) {
return ret;
@ -266,10 +401,10 @@ int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata)
return ret;
}
int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
int SrsDvrFlvSegmenter::write_audio(SrsSharedPtrMessage* shared_audio)
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* audio = shared_audio->copy();
SrsAutoFree(SrsSharedPtrMessage, audio);
@ -283,7 +418,7 @@ int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = on_update_duration(audio)) != ERROR_SUCCESS) {
return ret;
}
@ -291,10 +426,10 @@ int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
return ret;
}
int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video)
int SrsDvrFlvSegmenter::write_video(SrsSharedPtrMessage* shared_video)
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* video = shared_video->copy();
SrsAutoFree(SrsSharedPtrMessage, video);
@ -302,7 +437,7 @@ int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video)
int size = video->size;
bool is_sequence_header = SrsFlvCodec::video_is_sequence_header(payload, size);
bool is_key_frame = SrsFlvCodec::video_is_h264(payload, size)
bool is_key_frame = SrsFlvCodec::video_is_h264(payload, size)
&& SrsFlvCodec::video_is_keyframe(payload, size) && !is_sequence_header;
if (is_key_frame) {
has_keyframe = true;
@ -311,7 +446,7 @@ int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video)
}
}
srs_verbose("dvr video is key: %d", is_key_frame);
// accept the sequence header here.
// when got no keyframe, ignore when should wait keyframe.
if (!has_keyframe && !is_sequence_header) {
@ -340,35 +475,35 @@ int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video)
return ret;
}
int SrsFlvSegment::update_flv_metadata()
int SrsDvrFlvSegmenter::refresh_metadata()
{
int ret = ERROR_SUCCESS;
// no duration or filesize specified.
if (!duration_offset || !filesize_offset) {
return ret;
}
int64_t cur = fs->tellg();
// buffer to write the size.
char* buf = new char[SrsAmf0Size::number()];
SrsAutoFreeA(char, buf);
SrsBuffer stream;
if ((ret = stream.initialize(buf, SrsAmf0Size::number())) != ERROR_SUCCESS) {
return ret;
}
// filesize to buf.
SrsAmf0Any* size = SrsAmf0Any::number((double)cur);
SrsAutoFree(SrsAmf0Any, size);
stream.skip(-1 * stream.pos());
if ((ret = size->write(&stream)) != ERROR_SUCCESS) {
return ret;
}
// update the flesize.
fs->seek2(filesize_offset);
if ((ret = fs->write(buf, SrsAmf0Size::number(), NULL)) != ERROR_SUCCESS) {
@ -383,114 +518,62 @@ int SrsFlvSegment::update_flv_metadata()
if ((ret = dur->write(&stream)) != ERROR_SUCCESS) {
return ret;
}
// update the duration
fs->seek2(duration_offset);
if ((ret = fs->write(buf, SrsAmf0Size::number(), NULL)) != ERROR_SUCCESS) {
return ret;
}
// reset the offset.
fs->seek2(cur);
return ret;
}
string SrsFlvSegment::get_path()
{
return path;
}
string SrsFlvSegment::generate_path()
{
// the path in config, for example,
// /data/[vhost]/[app]/[stream]/[2006]/[01]/[02]/[15].[04].[05].[999].flv
std::string path_config = _srs_config->get_dvr_path(req->vhost);
// add [stream].[timestamp].flv as filename for dir
if (!srs_string_ends_with(path_config, ".flv")) {
path_config += "/[stream].[timestamp].flv";
}
// the flv file path
std::string flv_path = path_config;
flv_path = srs_path_build_stream(flv_path, req->vhost, req->app, req->stream);
flv_path = srs_path_build_timestamp(flv_path);
return flv_path;
}
int SrsFlvSegment::create_jitter(bool loads_from_flv)
{
int ret = ERROR_SUCCESS;
// when path exists, use exists jitter.
if (!loads_from_flv) {
// jitter when publish, ensure whole stream start from 0.
srs_freep(jitter);
jitter = new SrsRtmpJitter();
// fresh stream starting.
starttime = -1;
stream_previous_pkt_time = -1;
stream_starttime = srs_update_system_time_ms();
stream_duration = 0;
// fresh segment starting.
has_keyframe = false;
duration = 0;
return ret;
}
// when jitter ok, do nothing.
if (jitter) {
return ret;
}
// always ensure the jitter crote.
// for the first time, initialize jitter from exists file.
jitter = new SrsRtmpJitter();
// TODO: FIXME: implements it.
return ret;
}
int SrsFlvSegment::on_update_duration(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
// we must assumpt that the stream timestamp is monotonically increase,
// that is, always use time jitter to correct the timestamp.
// except the time jitter is disabled in config.
// set the segment starttime at first time
if (starttime < 0) {
starttime = msg->timestamp;
}
// no previous packet or timestamp overflow.
if (stream_previous_pkt_time < 0 || stream_previous_pkt_time > msg->timestamp) {
stream_previous_pkt_time = msg->timestamp;
}
// collect segment and stream duration, timestamp overflow is ok.
duration += msg->timestamp - stream_previous_pkt_time;
stream_duration += msg->timestamp - stream_previous_pkt_time;
// update previous packet time
stream_previous_pkt_time = msg->timestamp;
return ret;
}
int SrsFlvSegment::on_reload_vhost_dvr(std::string /*vhost*/)
SrsDvrMp4Segmenter::SrsDvrMp4Segmenter()
{
enc = new SrsMp4Encoder();
}
SrsDvrMp4Segmenter::~SrsDvrMp4Segmenter()
{
srs_freep(enc);
}
int SrsDvrMp4Segmenter::open_encoder()
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsDvrMp4Segmenter::close_encoder()
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsDvrMp4Segmenter::write_metadata(SrsSharedPtrMessage* metadata)
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsDvrMp4Segmenter::write_audio(SrsSharedPtrMessage* shared_audio)
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsDvrMp4Segmenter::write_video(SrsSharedPtrMessage* shared_video)
{
int ret = ERROR_SUCCESS;
return ret;
}
int SrsDvrMp4Segmenter::refresh_metadata()
{
int ret = ERROR_SUCCESS;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
return ret;
}
@ -553,7 +636,7 @@ SrsDvrPlan::SrsDvrPlan()
req = NULL;
dvr_enabled = false;
segment = new SrsFlvSegment(this);
segment = NULL;
async = new SrsAsyncCallWorker();
}
@ -563,13 +646,14 @@ SrsDvrPlan::~SrsDvrPlan()
srs_freep(async);
}
int SrsDvrPlan::initialize(SrsRequest* r)
int SrsDvrPlan::initialize(SrsDvrSegmenter* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
req = r;
segment = s;
if ((ret = segment->initialize(r)) != ERROR_SUCCESS) {
if ((ret = segment->initialize(this, r)) != ERROR_SUCCESS) {
return ret;
}
@ -643,19 +727,31 @@ int SrsDvrPlan::on_reap_segment()
return ret;
}
SrsDvrPlan* SrsDvrPlan::create_plan(string vhost)
int SrsDvrPlan::create_plan(string vhost, SrsDvrPlan** pplan)
{
int ret = ERROR_SUCCESS;
std::string plan = _srs_config->get_dvr_plan(vhost);
std::string path = _srs_config->get_dvr_path(vhost);
bool is_mp4 = srs_string_ends_with(path, ".mp4");
if (srs_config_dvr_is_plan_segment(plan)) {
return new SrsDvrSegmentPlan();
*pplan = new SrsDvrSegmentPlan();
} else if (srs_config_dvr_is_plan_session(plan)) {
return new SrsDvrSessionPlan();
*pplan = new SrsDvrSessionPlan();
} else if (srs_config_dvr_is_plan_append(plan)) {
return new SrsDvrAppendPlan();
if (is_mp4) {
ret = ERROR_DVR_ILLEGAL_PLAN;
srs_error("DVR plan append not support MP4. ret=%d", ret);
return ret;
}
*pplan = new SrsDvrAppendPlan();
} else {
srs_error("invalid dvr plan=%s, vhost=%s", plan.c_str(), vhost.c_str());
srs_assert(false);
}
return ret;
}
SrsDvrSessionPlan::SrsDvrSessionPlan()
@ -683,7 +779,7 @@ int SrsDvrSessionPlan::on_publish()
return ret;
}
if ((ret = segment->open()) != ERROR_SUCCESS) {
if ((ret = segment->open(true)) != ERROR_SUCCESS) {
return ret;
}
@ -793,7 +889,7 @@ int SrsDvrAppendPlan::update_duration(SrsSharedPtrMessage* msg)
last_update_time = msg->timestamp;
srs_assert(segment);
if (!segment->update_flv_metadata()) {
if ((ret = segment->refresh_metadata()) != ERROR_SUCCESS) {
return ret;
}
@ -813,11 +909,11 @@ SrsDvrSegmentPlan::~SrsDvrSegmentPlan()
srs_freep(metadata);
}
int SrsDvrSegmentPlan::initialize(SrsRequest* req)
int SrsDvrSegmentPlan::initialize(SrsDvrSegmenter* s, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
if ((ret = SrsDvrPlan::initialize(req)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::initialize(s, req)) != ERROR_SUCCESS) {
return ret;
}
@ -845,7 +941,7 @@ int SrsDvrSegmentPlan::on_publish()
return ret;
}
if ((ret = segment->open()) != ERROR_SUCCESS) {
if ((ret = segment->open(true)) != ERROR_SUCCESS) {
return ret;
}
@ -946,7 +1042,7 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
}
// open new flv file
if ((ret = segment->open()) != ERROR_SUCCESS) {
if ((ret = segment->open(true)) != ERROR_SUCCESS) {
return ret;
}
@ -992,9 +1088,19 @@ int SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r)
actived = srs_config_apply_filter(conf, r);
srs_freep(plan);
plan = SrsDvrPlan::create_plan(r->vhost);
if ((ret = plan->initialize(r)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::create_plan(r->vhost, &plan)) != ERROR_SUCCESS) {
return ret;
}
std::string path = _srs_config->get_dvr_path(r->vhost);
SrsDvrSegmenter* segmenter = NULL;
if (srs_string_ends_with(path, ".mp4")) {
segmenter = new SrsDvrMp4Segmenter();
} else {
segmenter = new SrsDvrFlvSegmenter();
}
if ((ret = plan->initialize(segmenter, r)) != ERROR_SUCCESS) {
return ret;
}