1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #179, refine dvr, support POST create dvr when publish not start. 2.0.126

This commit is contained in:
winlin 2015-02-27 20:39:36 +08:00
parent b903a7b436
commit 0213cc6466
11 changed files with 265 additions and 174 deletions

View file

@ -28,6 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <fcntl.h>
#include <sstream>
#include <sys/time.h>
#include <algorithm>
using namespace std;
#include <srs_app_config.hpp>
@ -54,7 +55,6 @@ using namespace std;
SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p)
{
req = NULL;
source = NULL;
jitter = NULL;
plan = p;
@ -85,11 +85,10 @@ SrsFlvSegment::~SrsFlvSegment()
srs_freep(enc);
}
int SrsFlvSegment::initialize(SrsSource* s, SrsRequest* r)
int SrsFlvSegment::initialize(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
source = s;
req = r;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
@ -617,48 +616,6 @@ string SrsDvrAsyncCallOnDvr::to_string()
return ss.str();
}
SrsDvrAsyncCallOnSegment::SrsDvrAsyncCallOnSegment(SrsRequest* r, string c, string p)
{
req = r;
callback = c;
path = p;
}
SrsDvrAsyncCallOnSegment::~SrsDvrAsyncCallOnSegment()
{
}
int SrsDvrAsyncCallOnSegment::call()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK
// HTTP: callback
if (callback.empty()) {
srs_warn("dvr: ignore for callback empty, vhost=%s", req->vhost.c_str());
return ret;
}
int connection_id = _srs_context->get_id();
std::string cwd = _srs_config->cwd();
std::string file = path;
std::string url = callback;
if ((ret = SrsHttpHooks::on_dvr_reap_segment(url, connection_id, req, cwd, file)) != ERROR_SUCCESS) {
srs_error("hook client on_dvr_reap_segment failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
#endif
return ret;
}
string SrsDvrAsyncCallOnSegment::to_string()
{
std::stringstream ss;
ss << "vhost=" << req->vhost << ", file=" << path << "callback=" << callback;
return ss.str();
}
SrsDvrAsyncCallThread::SrsDvrAsyncCallThread()
{
pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true);
@ -717,7 +674,6 @@ int SrsDvrAsyncCallThread::cycle()
SrsDvrPlan::SrsDvrPlan()
{
source = NULL;
req = NULL;
dvr_enabled = false;
@ -731,14 +687,13 @@ SrsDvrPlan::~SrsDvrPlan()
srs_freep(async);
}
int SrsDvrPlan::initialize(SrsSource* s, SrsRequest* r)
int SrsDvrPlan::initialize(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
source = s;
req = r;
if ((ret = segment->initialize(s, r)) != ERROR_SUCCESS) {
if ((ret = segment->initialize(r)) != ERROR_SUCCESS) {
return ret;
}
@ -749,18 +704,6 @@ int SrsDvrPlan::initialize(SrsSource* s, SrsRequest* r)
return ret;
}
int SrsDvrPlan::on_dvr_request_sh()
{
int ret = ERROR_SUCCESS;
// the dvr is enabled, notice the source to push the data.
if ((ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrPlan::on_video_keyframe()
{
return ERROR_SUCCESS;
@ -833,6 +776,15 @@ SrsDvrPlan* SrsDvrPlan::create_plan(string vhost)
} else if (plan == SRS_CONF_DEFAULT_DVR_PLAN_APPEND) {
return new SrsDvrAppendPlan();
} else if (plan == SRS_CONF_DEFAULT_DVR_PLAN_API) {
/**
* @remark the api plan maybe create by publish event or http api post create dvr event.
* so when we got from pool first when create it.
*/
SrsApiDvrPool* pool = SrsApiDvrPool::instance();
SrsDvrApiPlan* plan = pool->get_dvr(vhost);
if (plan) {
return plan;
}
return new SrsDvrApiPlan();
} else {
srs_error("invalid dvr plan=%s, vhost=%s", plan.c_str(), vhost.c_str());
@ -900,16 +852,19 @@ SrsDvrApiPlan::SrsDvrApiPlan()
SrsDvrApiPlan::~SrsDvrApiPlan()
{
SrsApiDvrPool* pool = SrsApiDvrPool::instance();
pool->detach_dvr(this);
srs_freep(metadata);
srs_freep(sh_audio);
srs_freep(sh_video);
}
int SrsDvrApiPlan::initialize(SrsSource* s, SrsRequest* r)
int SrsDvrApiPlan::initialize(SrsRequest* r)
{
int ret = ERROR_SUCCESS;
if ((ret = SrsDvrPlan::initialize(s, r)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::initialize(r)) != ERROR_SUCCESS) {
return ret;
}
@ -1017,6 +972,12 @@ int SrsDvrApiPlan::on_video(SrsSharedPtrMessage* __video)
return ret;
}
int SrsDvrApiPlan::set_plan()
{
_srs_config->set_dvr_plan(req->vhost, SRS_CONF_DEFAULT_DVR_PLAN_API);
return ERROR_SUCCESS;
}
int SrsDvrApiPlan::set_path_tmpl(string path_tmpl)
{
_srs_config->set_dvr_path(req->vhost, path_tmpl);
@ -1025,7 +986,8 @@ int SrsDvrApiPlan::set_path_tmpl(string path_tmpl)
int SrsDvrApiPlan::set_callback(string value)
{
callback = value;
_srs_config->set_vhost_http_hooks_enabled(req->vhost, true);
_srs_config->set_vhost_on_dvr(req->vhost, value);
return ERROR_SUCCESS;
}
@ -1072,6 +1034,7 @@ int SrsDvrApiPlan::dumps(stringstream& ss)
bool wait_keyframe = _srs_config->get_dvr_wait_keyframe(req->vhost);
std::string path_template = _srs_config->get_dvr_path(req->vhost);
SrsConfDirective* callbacks = _srs_config->get_vhost_on_dvr(req->vhost);
ss << __SRS_JOBJECT_START
<< __SRS_JFIELD_STR("path_tmpl", path_template) << __SRS_JFIELD_CONT
@ -1080,7 +1043,7 @@ int SrsDvrApiPlan::dumps(stringstream& ss)
<< __SRS_JFIELD_STR("vhost", req->vhost) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("app", req->app) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("stream", req->stream) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("callback", callback) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("callback", callbacks->arg0()) << __SRS_JFIELD_CONT
<< __SRS_JFIELD_STR("status", (dvr_enabled? "start":"stop"))
<< __SRS_JOBJECT_END;
@ -1133,21 +1096,6 @@ int SrsDvrApiPlan::rpc(SrsJsonObject* obj)
return ret;
}
int SrsDvrApiPlan::on_reap_segment()
{
int ret = ERROR_SUCCESS;
if ((ret = SrsDvrPlan::on_reap_segment()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = async->call(new SrsDvrAsyncCallOnSegment(req, callback, segment->get_path()))) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsDvrApiPlan::check_user_actions(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -1311,11 +1259,11 @@ SrsDvrSegmentPlan::~SrsDvrSegmentPlan()
srs_freep(metadata);
}
int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req)
int SrsDvrSegmentPlan::initialize(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
if ((ret = SrsDvrPlan::initialize(source, req)) != ERROR_SUCCESS) {
if ((ret = SrsDvrPlan::initialize(req)) != ERROR_SUCCESS) {
return ret;
}
@ -1478,12 +1426,35 @@ SrsApiDvrPool::~SrsApiDvrPool()
dvrs.clear();
}
SrsDvrApiPlan* SrsApiDvrPool::get_dvr(string vhost)
{
std::vector<SrsDvrApiPlan*>::iterator it;
for (it = dvrs.begin(); it != dvrs.end(); ++it) {
SrsDvrApiPlan* plan = *it;
if (plan->req->vhost == vhost) {
return plan;
}
}
return NULL;
}
int SrsApiDvrPool::add_dvr(SrsDvrApiPlan* dvr)
{
dvrs.push_back(dvr);
return ERROR_SUCCESS;
}
void SrsApiDvrPool::detach_dvr(SrsDvrApiPlan* dvr)
{
std::vector<SrsDvrApiPlan*>::iterator it;
it = ::find(dvrs.begin(), dvrs.end(), dvr);
if (it != dvrs.end()) {
dvrs.erase(it);
}
}
int SrsApiDvrPool::dumps(string vhost, string app, string stream, stringstream& ss)
{
int ret = ERROR_SUCCESS;
@ -1540,39 +1511,69 @@ int SrsApiDvrPool::create(SrsJsonAny* json)
srs_error("dvr: api create dvr request requires vhost. ret=%d", ret);
return ret;
}
std::string vhost = prop->to_str();
std::string app, stream;
if ((prop = obj->ensure_property_string("app")) != NULL) {
app = prop->to_str();
if ((prop = obj->ensure_property_string("app")) == NULL) {
ret = ERROR_HTTP_DVR_CREATE_REQUEST;
srs_error("dvr: api create dvr request requires app. ret=%d", ret);
return ret;
}
if ((prop = obj->ensure_property_string("stream")) != NULL) {
stream = prop->to_str();
std::string app = prop->to_str();
if ((prop = obj->ensure_property_string("stream")) == NULL) {
ret = ERROR_HTTP_DVR_CREATE_REQUEST;
srs_error("dvr: api create dvr request requires stream. ret=%d", ret);
return ret;
}
std::string stream = prop->to_str();
if (vhost.empty() || app.empty() || stream.empty()) {
ret = ERROR_HTTP_DVR_CREATE_REQUEST;
srs_error("dvr: api create dvr request requires vhost/app/stream. ret=%d", ret);
return ret;
}
SrsDvrApiPlan* dvr = NULL;
for (int i = 0; i < (int)dvrs.size(); i++) {
SrsDvrApiPlan* plan = dvrs.at(i);
if (!vhost.empty() && plan->req->vhost != vhost) {
continue;
}
if (!app.empty() && plan->req->app != app) {
continue;
}
if (!stream.empty() && plan->req->stream != stream) {
if (plan->req->vhost != vhost || plan->req->app != app || plan->req->stream != stream) {
continue;
}
dvr = plan;
break;
}
// mock the client request for dvr.
SrsRequest* req = new SrsRequest();
SrsAutoFree(SrsRequest, req);
// should notice the source to reload dvr when already publishing.
SrsSource* source = NULL;
// create if not exists
if (!dvr) {
ret = ERROR_HTTP_DVR_NO_TAEGET;
srs_error("dvr: create not found for url=%s/%s/%s, ret=%d", vhost.c_str(), app.c_str(), stream.c_str(), ret);
return ret;
dvr = new SrsDvrApiPlan();
req->vhost = vhost;
req->app = app;
req->stream = stream;
req->tcUrl = "rtmp://" + vhost + "/" + app + "/" + stream;
// fetch source from pool.
// NULL, create without source, ignore.
// start dvr when already publishing.
source = SrsSource::fetch(req);
// initialize for dvr pool to create it.
if ((ret = dvr->initialize(req)) != ERROR_SUCCESS) {
return ret;
}
}
// update optional parameters for plan.
if ((ret = dvr->set_plan()) != ERROR_SUCCESS) {
return ret;
}
if ((prop = obj->ensure_property_string("path_tmpl")) != NULL) {
if ((ret = dvr->set_path_tmpl(prop->to_str())) != ERROR_SUCCESS) {
return ret;
@ -1589,7 +1590,19 @@ int SrsApiDvrPool::create(SrsJsonAny* json)
}
}
return dvr->start();
if ((ret = dvr->start()) != ERROR_SUCCESS) {
return ret;
}
// do reload for source when already publishing.
// when reload, the source will use the request instead.
if (source) {
if ((ret = source->on_reload_vhost_dvr(vhost)) != ERROR_SUCCESS) {
return ret;
}
}
return ret;
}
int SrsApiDvrPool::stop(string vhost, string app, string stream)
@ -1681,9 +1694,9 @@ int SrsApiDvrPool::rpc(SrsJsonAny* json)
return ret;
}
SrsDvr::SrsDvr(SrsSource* s)
SrsDvr::SrsDvr()
{
source = s;
source = NULL;
plan = NULL;
}
@ -1692,14 +1705,20 @@ SrsDvr::~SrsDvr()
srs_freep(plan);
}
int SrsDvr::initialize(SrsRequest* r)
int SrsDvr::initialize(SrsSource* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
source = s;
srs_freep(plan);
plan = SrsDvrPlan::create_plan(r->vhost);
if ((ret = plan->initialize(source, r)) != ERROR_SUCCESS) {
if ((ret = plan->initialize(r)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) {
return ret;
}
@ -1722,6 +1741,7 @@ void SrsDvr::on_unpublish()
plan->on_unpublish();
}
// TODO: FIXME: source should use shared message instead.
int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m)
{
int ret = ERROR_SUCCESS;