1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, source cycle error

This commit is contained in:
winlin 2017-06-11 09:40:07 +08:00
parent 7275fd5397
commit 7cac35a289
15 changed files with 161 additions and 152 deletions

View file

@ -172,10 +172,8 @@ SrsMpdWriter::~SrsMpdWriter()
{ {
} }
int SrsMpdWriter::initialize(SrsRequest* r) srs_error_t SrsMpdWriter::initialize(SrsRequest* r)
{ {
int ret = ERROR_SUCCESS;
req = r; req = r;
fragment = _srs_config->get_dash_fragment(r->vhost); fragment = _srs_config->get_dash_fragment(r->vhost);
update_period = _srs_config->get_dash_update_period(r->vhost); update_period = _srs_config->get_dash_update_period(r->vhost);
@ -187,8 +185,7 @@ int SrsMpdWriter::initialize(SrsRequest* r)
fragment_home = srs_path_dirname(mpd_path) + "/" + req->stream; fragment_home = srs_path_dirname(mpd_path) + "/" + req->stream;
srs_trace("DASH: Config fragment=%d, period=%d", fragment, update_period); srs_trace("DASH: Config fragment=%d, period=%d", fragment, update_period);
return srs_success;
return ret;
} }
int SrsMpdWriter::write(SrsFormat* format) int SrsMpdWriter::write(SrsFormat* format)
@ -309,16 +306,17 @@ SrsDashController::~SrsDashController()
srs_freep(afragments); srs_freep(afragments);
} }
int SrsDashController::initialize(SrsRequest* r) srs_error_t SrsDashController::initialize(SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
req = r; req = r;
fragment = _srs_config->get_dash_fragment(r->vhost); fragment = _srs_config->get_dash_fragment(r->vhost);
home = _srs_config->get_dash_path(r->vhost); home = _srs_config->get_dash_path(r->vhost);
if ((ret = mpd->initialize(r)) != ERROR_SUCCESS) { if ((err = mpd->initialize(r)) != srs_success) {
return ret; return srs_error_wrap(err, "mpd");
} }
string home, path; string home, path;
@ -326,18 +324,16 @@ int SrsDashController::initialize(SrsRequest* r)
srs_freep(vcurrent); srs_freep(vcurrent);
vcurrent = new SrsFragmentedMp4(); vcurrent = new SrsFragmentedMp4();
if ((ret = vcurrent->initialize(req, true, mpd, video_tack_id)) != ERROR_SUCCESS) { if ((ret = vcurrent->initialize(req, true, mpd, video_tack_id)) != ERROR_SUCCESS) {
srs_error("DASH: Initialize the video fragment failed, ret=%d", ret); return srs_error_new(ret, "video fragment");
return ret;
} }
srs_freep(acurrent); srs_freep(acurrent);
acurrent = new SrsFragmentedMp4(); acurrent = new SrsFragmentedMp4();
if ((ret = acurrent->initialize(req, false, mpd, audio_track_id)) != ERROR_SUCCESS) { if ((ret = acurrent->initialize(req, false, mpd, audio_track_id)) != ERROR_SUCCESS) {
srs_error("DASH: Initialize the audio fragment failed, ret=%d", ret); return srs_error_new(ret, "audio fragment");
return ret;
} }
return ret; return err;
} }
int SrsDashController::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) int SrsDashController::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
@ -484,19 +480,18 @@ SrsDash::~SrsDash()
srs_freep(controller); srs_freep(controller);
} }
int SrsDash::initialize(SrsOriginHub* h, SrsRequest* r) srs_error_t SrsDash::initialize(SrsOriginHub* h, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
hub = h; hub = h;
req = r; req = r;
if ((ret = controller->initialize(req)) != ERROR_SUCCESS) { if ((err = controller->initialize(req)) != srs_success) {
srs_error("DASH: Initialize controller failed. ret=%d", ret); return srs_error_wrap(err, "controller");
return ret;
} }
return ret; return err;
} }
int SrsDash::on_publish() int SrsDash::on_publish()

View file

@ -102,7 +102,7 @@ public:
SrsMpdWriter(); SrsMpdWriter();
virtual ~SrsMpdWriter(); virtual ~SrsMpdWriter();
public: public:
virtual int initialize(SrsRequest* r); virtual srs_error_t initialize(SrsRequest* r);
// Write MPD according to parsed format of stream. // Write MPD according to parsed format of stream.
virtual int write(SrsFormat* format); virtual int write(SrsFormat* format);
public: public:
@ -137,7 +137,7 @@ public:
SrsDashController(); SrsDashController();
virtual ~SrsDashController(); virtual ~SrsDashController();
public: public:
virtual int initialize(SrsRequest* r); virtual srs_error_t initialize(SrsRequest* r);
virtual int on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); virtual int on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);
virtual int on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format); virtual int on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format);
private: private:
@ -161,7 +161,7 @@ public:
virtual ~SrsDash(); virtual ~SrsDash();
public: public:
// Initalize the encoder. // Initalize the encoder.
virtual int initialize(SrsOriginHub* h, SrsRequest* r); virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
// When stream start publishing. // When stream start publishing.
virtual int on_publish(); virtual int on_publish();
// When got an shared audio message. // When got an shared audio message.

View file

@ -65,17 +65,15 @@ SrsDvrSegmenter::~SrsDvrSegmenter()
srs_freep(fs); srs_freep(fs);
} }
int SrsDvrSegmenter::initialize(SrsDvrPlan* p, SrsRequest* r) srs_error_t SrsDvrSegmenter::initialize(SrsDvrPlan* p, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS;
req = r; req = r;
plan = p; plan = p;
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost); jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost);
wait_keyframe = _srs_config->get_dvr_wait_keyframe(req->vhost); wait_keyframe = _srs_config->get_dvr_wait_keyframe(req->vhost);
return ret; return srs_success;
} }
SrsFragment* SrsDvrSegmenter::current() SrsFragment* SrsDvrSegmenter::current()
@ -612,23 +610,24 @@ SrsDvrPlan::~SrsDvrPlan()
srs_freep(async); srs_freep(async);
} }
int SrsDvrPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r) srs_error_t SrsDvrPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
hub = h; hub = h;
req = r; req = r;
segment = s; segment = s;
if ((ret = segment->initialize(this, r)) != ERROR_SUCCESS) { if ((err = segment->initialize(this, r)) != srs_success) {
return ret; return srs_error_wrap(err, "segmenter");
} }
if ((ret = async->start()) != ERROR_SUCCESS) { if ((ret = async->start()) != ERROR_SUCCESS) {
return ret; return srs_error_new(ret, "async");
} }
return ret; return err;
} }
int SrsDvrPlan::on_meta_data(SrsSharedPtrMessage* shared_metadata) int SrsDvrPlan::on_meta_data(SrsSharedPtrMessage* shared_metadata)
@ -688,22 +687,19 @@ int SrsDvrPlan::on_reap_segment()
return ret; return ret;
} }
int SrsDvrPlan::create_plan(string vhost, SrsDvrPlan** pplan) srs_error_t SrsDvrPlan::create_plan(string vhost, SrsDvrPlan** pplan)
{ {
int ret = ERROR_SUCCESS;
std::string plan = _srs_config->get_dvr_plan(vhost); std::string plan = _srs_config->get_dvr_plan(vhost);
if (srs_config_dvr_is_plan_segment(plan)) { if (srs_config_dvr_is_plan_segment(plan)) {
*pplan = new SrsDvrSegmentPlan(); *pplan = new SrsDvrSegmentPlan();
} else if (srs_config_dvr_is_plan_session(plan)) { } else if (srs_config_dvr_is_plan_session(plan)) {
*pplan = new SrsDvrSessionPlan(); *pplan = new SrsDvrSessionPlan();
} else { } else {
ret = ERROR_DVR_ILLEGAL_PLAN; return srs_error_new(ERROR_DVR_ILLEGAL_PLAN, "illegal plan=%s, vhost=%s",
srs_error("DVR illegal plan=%s, vhost=%s. ret=%d", plan.c_str(), vhost.c_str(), ret); plan.c_str(), vhost.c_str());
return ret;
} }
return ret; return srs_success;
} }
SrsDvrSessionPlan::SrsDvrSessionPlan() SrsDvrSessionPlan::SrsDvrSessionPlan()
@ -766,12 +762,12 @@ SrsDvrSegmentPlan::~SrsDvrSegmentPlan()
{ {
} }
int SrsDvrSegmentPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r) srs_error_t SrsDvrSegmentPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
if ((ret = SrsDvrPlan::initialize(h, s, r)) != ERROR_SUCCESS) { if ((err = SrsDvrPlan::initialize(h, s, r)) != srs_success) {
return ret; return srs_error_wrap(err, "segment plan");
} }
wait_keyframe = _srs_config->get_dvr_wait_keyframe(req->vhost); wait_keyframe = _srs_config->get_dvr_wait_keyframe(req->vhost);
@ -780,7 +776,7 @@ int SrsDvrSegmentPlan::initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsReques
// to ms // to ms
cduration *= 1000; cduration *= 1000;
return ret; return srs_success;
} }
int SrsDvrSegmentPlan::on_publish() int SrsDvrSegmentPlan::on_publish()
@ -924,9 +920,9 @@ SrsDvr::~SrsDvr()
srs_freep(plan); srs_freep(plan);
} }
int SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r) srs_error_t SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
req = r; req = r;
hub = h; hub = h;
@ -935,8 +931,8 @@ int SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r)
actived = srs_config_apply_filter(conf, r); actived = srs_config_apply_filter(conf, r);
srs_freep(plan); srs_freep(plan);
if ((ret = SrsDvrPlan::create_plan(r->vhost, &plan)) != ERROR_SUCCESS) { if ((err = SrsDvrPlan::create_plan(r->vhost, &plan)) != srs_success) {
return ret; return srs_error_wrap(err, "create plan");
} }
std::string path = _srs_config->get_dvr_path(r->vhost); std::string path = _srs_config->get_dvr_path(r->vhost);
@ -947,11 +943,11 @@ int SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r)
segmenter = new SrsDvrFlvSegmenter(); segmenter = new SrsDvrFlvSegmenter();
} }
if ((ret = plan->initialize(hub, segmenter, r)) != ERROR_SUCCESS) { if ((err = plan->initialize(hub, segmenter, r)) != srs_success) {
return ret; return srs_error_wrap(err, "plan initialize");
} }
return ret; return err;
} }
int SrsDvr::on_publish() int SrsDvr::on_publish()

View file

@ -72,7 +72,7 @@ public:
virtual ~SrsDvrSegmenter(); virtual ~SrsDvrSegmenter();
public: public:
// Initialize the segment. // Initialize the segment.
virtual int initialize(SrsDvrPlan* p, SrsRequest* r); virtual srs_error_t initialize(SrsDvrPlan* p, SrsRequest* r);
// Get the current framgnet. // Get the current framgnet.
virtual SrsFragment* current(); virtual SrsFragment* current();
// Open new segment file. // Open new segment file.
@ -194,7 +194,7 @@ public:
SrsDvrPlan(); SrsDvrPlan();
virtual ~SrsDvrPlan(); virtual ~SrsDvrPlan();
public: public:
virtual int initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r); virtual srs_error_t initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r);
virtual int on_publish() = 0; virtual int on_publish() = 0;
virtual void on_unpublish() = 0; virtual void on_unpublish() = 0;
virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata); virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata);
@ -205,7 +205,7 @@ public:
// When segmenter close a segment. // When segmenter close a segment.
virtual int on_reap_segment(); virtual int on_reap_segment();
public: public:
static int create_plan(std::string vhost, SrsDvrPlan** pplan); static srs_error_t create_plan(std::string vhost, SrsDvrPlan** pplan);
}; };
/** /**
@ -234,7 +234,7 @@ public:
SrsDvrSegmentPlan(); SrsDvrSegmentPlan();
virtual ~SrsDvrSegmentPlan(); virtual ~SrsDvrSegmentPlan();
public: public:
virtual int initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r); virtual srs_error_t initialize(SrsOriginHub* h, SrsDvrSegmenter* s, SrsRequest* r);
virtual int on_publish(); virtual int on_publish();
virtual void on_unpublish(); virtual void on_unpublish();
virtual int on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); virtual int on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);
@ -269,7 +269,7 @@ public:
* when system initialize(encoder publish at first time, or reload), * when system initialize(encoder publish at first time, or reload),
* initialize the dvr will reinitialize the plan, the whole dvr framework. * initialize the dvr will reinitialize the plan, the whole dvr framework.
*/ */
virtual int initialize(SrsOriginHub* h, SrsRequest* r); virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
/** /**
* publish stream event, * publish stream event,
* when encoder start to publish RTMP stream. * when encoder start to publish RTMP stream.

View file

@ -178,15 +178,13 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(trd); srs_freep(trd);
} }
int SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r) srs_error_t SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS;
source = s; source = s;
edge = e; edge = e;
req = r; req = r;
return ret; return srs_success;
} }
int SrsEdgeIngester::start() int SrsEdgeIngester::start()
@ -441,15 +439,13 @@ void SrsEdgeForwarder::set_queue_size(double queue_size)
return queue->set_queue_size(queue_size); return queue->set_queue_size(queue_size);
} }
int SrsEdgeForwarder::initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r) srs_error_t SrsEdgeForwarder::initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS;
source = s; source = s;
edge = e; edge = e;
req = r; req = r;
return ret; return srs_success;
} }
int SrsEdgeForwarder::start() int SrsEdgeForwarder::start()
@ -634,15 +630,15 @@ SrsPlayEdge::~SrsPlayEdge()
srs_freep(ingester); srs_freep(ingester);
} }
int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req) srs_error_t SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) { if ((err = ingester->initialize(source, this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "ingester(pull)");
} }
return ret; return err;
} }
int SrsPlayEdge::on_client_play() int SrsPlayEdge::on_client_play()
@ -712,15 +708,15 @@ void SrsPublishEdge::set_queue_size(double queue_size)
return forwarder->set_queue_size(queue_size); return forwarder->set_queue_size(queue_size);
} }
int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req) srs_error_t SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) { if ((err = forwarder->initialize(source, this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "forwarder(push)");
} }
return ret; return err;
} }
bool SrsPublishEdge::can_publish() bool SrsPublishEdge::can_publish()

View file

@ -129,7 +129,7 @@ public:
SrsEdgeIngester(); SrsEdgeIngester();
virtual ~SrsEdgeIngester(); virtual ~SrsEdgeIngester();
public: public:
virtual int initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r); virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);
virtual int start(); virtual int start();
virtual void stop(); virtual void stop();
virtual std::string get_curr_origin(); virtual std::string get_curr_origin();
@ -172,7 +172,7 @@ public:
public: public:
virtual void set_queue_size(double queue_size); virtual void set_queue_size(double queue_size);
public: public:
virtual int initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r); virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual int start(); virtual int start();
virtual void stop(); virtual void stop();
// interface ISrsReusableThread2Handler // interface ISrsReusableThread2Handler
@ -202,7 +202,7 @@ public:
* for we assume all client to edge is invalid, * for we assume all client to edge is invalid,
* if auth open, edge must valid it from origin, then service it. * if auth open, edge must valid it from origin, then service it.
*/ */
virtual int initialize(SrsSource* source, SrsRequest* req); virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
/** /**
* when client play stream on edge. * when client play stream on edge.
*/ */
@ -234,7 +234,7 @@ public:
public: public:
virtual void set_queue_size(double queue_size); virtual void set_queue_size(double queue_size);
public: public:
virtual int initialize(SrsSource* source, SrsRequest* req); virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual bool can_publish(); virtual bool can_publish();
/** /**
* when client publish stream on edge. * when client publish stream on edge.

View file

@ -255,15 +255,15 @@ int SrsHlsMuxer::deviation()
return deviation_ts; return deviation_ts;
} }
int SrsHlsMuxer::initialize() srs_error_t SrsHlsMuxer::initialize()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if ((ret = async->start()) != ERROR_SUCCESS) { if ((ret = async->start()) != ERROR_SUCCESS) {
return ret; return srs_error_new(ret, "async start");
} }
return ret; return srs_success;
} }
int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
@ -745,9 +745,13 @@ SrsHlsController::~SrsHlsController()
srs_freep(tsmc); srs_freep(tsmc);
} }
int SrsHlsController::initialize() srs_error_t SrsHlsController::initialize()
{ {
return muxer->initialize(); srs_error_t err = muxer->initialize();
if (err != srs_success) {
return srs_error_wrap(err, "hls muxer initialize");
}
return srs_success;
} }
void SrsHlsController::dispose() void SrsHlsController::dispose()
@ -1002,50 +1006,50 @@ void SrsHls::dispose()
controller->dispose(); controller->dispose();
} }
int SrsHls::cycle() srs_error_t SrsHls::cycle()
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
if (last_update_time <= 0) { if (last_update_time <= 0) {
last_update_time = srs_get_system_time_ms(); last_update_time = srs_get_system_time_ms();
} }
if (!req) { if (!req) {
return ret; return err;
} }
int hls_dispose = _srs_config->get_hls_dispose(req->vhost) * 1000; int hls_dispose = _srs_config->get_hls_dispose(req->vhost) * 1000;
if (hls_dispose <= 0) { if (hls_dispose <= 0) {
return ret; return err;
} }
if (srs_get_system_time_ms() - last_update_time <= hls_dispose) { if (srs_get_system_time_ms() - last_update_time <= hls_dispose) {
return ret; return err;
} }
last_update_time = srs_get_system_time_ms(); last_update_time = srs_get_system_time_ms();
if (!disposable) { if (!disposable) {
return ret; return err;
} }
disposable = false; disposable = false;
srs_trace("hls cycle to dispose hls %s, timeout=%dms", req->get_stream_url().c_str(), hls_dispose); srs_trace("hls cycle to dispose hls %s, timeout=%dms", req->get_stream_url().c_str(), hls_dispose);
dispose(); dispose();
return ret; return err;
} }
int SrsHls::initialize(SrsOriginHub* h, SrsRequest* r) srs_error_t SrsHls::initialize(SrsOriginHub* h, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
hub = h; hub = h;
req = r; req = r;
if ((ret = controller->initialize()) != ERROR_SUCCESS) { if ((err = controller->initialize()) != srs_success) {
return ret; return srs_error_wrap(err, "controller initialize");
} }
return ret; return err;
} }
int SrsHls::on_publish() int SrsHls::on_publish()

View file

@ -175,7 +175,7 @@ public:
/** /**
* initialize the hls muxer. * initialize the hls muxer.
*/ */
virtual int initialize(); virtual srs_error_t initialize();
/** /**
* when publish, update the config for muxer. * when publish, update the config for muxer.
*/ */
@ -248,7 +248,7 @@ public:
SrsHlsController(); SrsHlsController();
virtual ~SrsHlsController(); virtual ~SrsHlsController();
public: public:
virtual int initialize(); virtual srs_error_t initialize();
virtual void dispose(); virtual void dispose();
virtual int sequence_no(); virtual int sequence_no();
virtual std::string ts_url(); virtual std::string ts_url();
@ -313,12 +313,12 @@ public:
virtual ~SrsHls(); virtual ~SrsHls();
public: public:
virtual void dispose(); virtual void dispose();
virtual int cycle(); virtual srs_error_t cycle();
public: public:
/** /**
* initialize the hls by handler and source. * initialize the hls by handler and source.
*/ */
virtual int initialize(SrsOriginHub* h, SrsRequest* r); virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
/** /**
* publish stream event, continue to write the m3u8, * publish stream event, continue to write the m3u8,
* for the muxer object not destroyed. * for the muxer object not destroyed.

View file

@ -920,8 +920,8 @@ srs_error_t SrsServer::do_cycle()
// the deamon thread, update the time cache // the deamon thread, update the time cache
// TODO: FIXME: use SrsHourGlass. // TODO: FIXME: use SrsHourGlass.
while (true) { while (true) {
if (handler && (ret = handler->on_cycle()) != ERROR_SUCCESS) { if (handler && (err = handler->on_cycle()) != srs_success) {
return srs_error_new(ret, "handle callback"); return srs_error_wrap(err, "handle callback");
} }
// the interval in config. // the interval in config.
@ -979,8 +979,8 @@ srs_error_t SrsServer::do_cycle()
} }
// notice the stream sources to cycle. // notice the stream sources to cycle.
if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) { if ((err = SrsSource::cycle_all()) != srs_success) {
return srs_error_new(ret, "source cycle"); return srs_error_wrap(err, "source cycle");
} }
// update the cache time // update the cache time

View file

@ -223,7 +223,7 @@ public:
/** /**
* do on_cycle while server doing cycle. * do on_cycle while server doing cycle.
*/ */
virtual int on_cycle() = 0; virtual srs_error_t on_cycle() = 0;
/** /**
* callback the handler when got client. * callback the handler when got client.
*/ */

View file

@ -885,30 +885,30 @@ SrsOriginHub::~SrsOriginHub()
#endif #endif
} }
int SrsOriginHub::initialize(SrsSource* s, SrsRequest* r) srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
req = r; req = r;
source = s; source = s;
if ((ret = format->initialize()) != ERROR_SUCCESS) { if ((err = format->initialize()) != srs_success) {
return ret; return srs_error_wrap(err, "format initialize");
} }
if ((ret = hls->initialize(this, req)) != ERROR_SUCCESS) { if ((err = hls->initialize(this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "hls initialize");
} }
if ((ret = dash->initialize(this, req)) != ERROR_SUCCESS) { if ((err = dash->initialize(this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "dash initialize");
} }
if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) { if ((err = dvr->initialize(this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "dvr initialize");
} }
return ret; return err;
} }
void SrsOriginHub::dispose() void SrsOriginHub::dispose()
@ -918,17 +918,17 @@ void SrsOriginHub::dispose()
// TODO: Support dispose DASH. // TODO: Support dispose DASH.
} }
int SrsOriginHub::cycle() srs_error_t SrsOriginHub::cycle()
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
if ((ret = hls->cycle()) != ERROR_SUCCESS) { if ((err = hls->cycle()) != srs_success) {
return ret; return srs_error_wrap(err, "hls cycle");
} }
// TODO: Support cycle DASH. // TODO: Support cycle DASH.
return ret; return err;
} }
int SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet) int SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet)
@ -1452,6 +1452,7 @@ int SrsOriginHub::on_reload_vhost_hds(string vhost)
int SrsOriginHub::on_reload_vhost_dvr(string vhost) int SrsOriginHub::on_reload_vhost_dvr(string vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (req->vhost != vhost) { if (req->vhost != vhost) {
return ret; return ret;
@ -1468,7 +1469,11 @@ int SrsOriginHub::on_reload_vhost_dvr(string vhost)
} }
// reinitialize the dvr, update plan. // reinitialize the dvr, update plan.
if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) { if ((err = dvr->initialize(this, req)) != srs_success) {
// TODO: FIXME: Use error.
ret = srs_error_code(err);
srs_freep(err);
return ret; return ret;
} }
@ -1749,6 +1754,7 @@ std::map<std::string, SrsSource*> SrsSource::pool;
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
SrsSource* source = NULL; SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) { if ((source = fetch(r)) != NULL) {
@ -1763,7 +1769,11 @@ int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource**
srs_assert (pool.find(stream_url) == pool.end()); srs_assert (pool.find(stream_url) == pool.end());
source = new SrsSource(); source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) { if ((err = source->initialize(r, h)) != srs_success) {
// TODO: FIXME: Use error.
ret = srs_error_code(err);
srs_freep(err);
srs_freep(source); srs_freep(source);
return ret; return ret;
} }
@ -1805,28 +1815,26 @@ void SrsSource::dispose_all()
return; return;
} }
int SrsSource::cycle_all() srs_error_t SrsSource::cycle_all()
{ {
int ret = ERROR_SUCCESS;
int cid = _srs_context->get_id(); int cid = _srs_context->get_id();
ret = do_cycle_all(); srs_error_t err = do_cycle_all();
_srs_context->set_id(cid); _srs_context->set_id(cid);
return ret; return err;
} }
int SrsSource::do_cycle_all() srs_error_t SrsSource::do_cycle_all()
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
std::map<std::string, SrsSource*>::iterator it; std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) { for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second; SrsSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose. // Do cycle source to cleanup components, such as hls dispose.
if ((ret = source->cycle()) != ERROR_SUCCESS) { if ((err = source->cycle()) != srs_success) {
return ret; return srs_error_wrap(err, "source=%d/%d cycle", source->source_id(), source->pre_source_id());
} }
// TODO: FIXME: support source cleanup. // TODO: FIXME: support source cleanup.
@ -1854,7 +1862,7 @@ int SrsSource::do_cycle_all()
#endif #endif
} }
return ret; return err;
} }
void SrsSource::destroy() void SrsSource::destroy()
@ -1919,9 +1927,14 @@ void SrsSource::dispose()
gop_cache->dispose(); gop_cache->dispose();
} }
int SrsSource::cycle() srs_error_t SrsSource::cycle()
{ {
return hub->cycle(); srs_error_t err = hub->cycle();
if (err != srs_success) {
return srs_error_wrap(err, "hub cycle");
}
return srs_success;
} }
bool SrsSource::expired() bool SrsSource::expired()
@ -1949,9 +1962,9 @@ bool SrsSource::expired()
return false; return false;
} }
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{ {
int ret = ERROR_SUCCESS; srs_error_t err = srs_success;
srs_assert(h); srs_assert(h);
srs_assert(!req); srs_assert(!req);
@ -1960,15 +1973,15 @@ int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
req = r->copy(); req = r->copy();
atc = _srs_config->get_atc(req->vhost); atc = _srs_config->get_atc(req->vhost);
if ((ret = hub->initialize(this, req)) != ERROR_SUCCESS) { if ((err = hub->initialize(this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "hub");
} }
if ((ret = play_edge->initialize(this, req)) != ERROR_SUCCESS) { if ((err = play_edge->initialize(this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "edge(play)");
} }
if ((ret = publish_edge->initialize(this, req)) != ERROR_SUCCESS) { if ((err = publish_edge->initialize(this, req)) != srs_success) {
return ret; return srs_error_wrap(err, "edge(publish)");
} }
double queue_size = _srs_config->get_queue_length(req->vhost); double queue_size = _srs_config->get_queue_length(req->vhost);
@ -1977,7 +1990,7 @@ int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost); jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
mix_correct = _srs_config->get_mix_correct(req->vhost); mix_correct = _srs_config->get_mix_correct(req->vhost);
return ret; return err;
} }
int SrsSource::on_reload_vhost_play(string vhost) int SrsSource::on_reload_vhost_play(string vhost)

View file

@ -445,13 +445,13 @@ public:
public: public:
// Initialize the hub with source and request. // Initialize the hub with source and request.
// @param r The request object, managed by source. // @param r The request object, managed by source.
virtual int initialize(SrsSource* s, SrsRequest* r); virtual srs_error_t initialize(SrsSource* s, SrsRequest* r);
// Dispose the hub, release utilities resource, // Dispose the hub, release utilities resource,
// for example, delete all HLS pieces. // for example, delete all HLS pieces.
virtual void dispose(); virtual void dispose();
// Cycle the hub, process some regular events, // Cycle the hub, process some regular events,
// for example, dispose hls in cycle. // for example, dispose hls in cycle.
virtual int cycle(); virtual srs_error_t cycle();
public: public:
// When got a parsed metadata. // When got a parsed metadata.
virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet); virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet);
@ -555,9 +555,9 @@ public:
* dispose and cycle all sources. * dispose and cycle all sources.
*/ */
static void dispose_all(); static void dispose_all();
static int cycle_all(); static srs_error_t cycle_all();
private: private:
static int do_cycle_all(); static srs_error_t do_cycle_all();
public: public:
/** /**
* when system exit, destroy the sources, * when system exit, destroy the sources,
@ -620,7 +620,7 @@ public:
virtual ~SrsSource(); virtual ~SrsSource();
public: public:
virtual void dispose(); virtual void dispose();
virtual int cycle(); virtual srs_error_t cycle();
// remove source when expired. // remove source when expired.
virtual bool expired(); virtual bool expired();
// initialize, get and setter. // initialize, get and setter.
@ -628,7 +628,7 @@ public:
/** /**
* initialize the hls with handlers. * initialize the hls with handlers.
*/ */
virtual int initialize(SrsRequest* r, ISrsSourceHandler* h); virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
// interface ISrsReloadHandler // interface ISrsReloadHandler
public: public:
virtual int on_reload_vhost_play(std::string vhost); virtual int on_reload_vhost_play(std::string vhost);

View file

@ -524,9 +524,9 @@ SrsFormat::~SrsFormat()
srs_freep(buffer); srs_freep(buffer);
} }
int SrsFormat::initialize() srs_error_t SrsFormat::initialize()
{ {
return ERROR_SUCCESS; return srs_success;
} }
int SrsFormat::on_audio(int64_t timestamp, char* data, int size) int SrsFormat::on_audio(int64_t timestamp, char* data, int size)

View file

@ -693,7 +693,7 @@ public:
virtual ~SrsFormat(); virtual ~SrsFormat();
public: public:
// Initialize the format. // Initialize the format.
virtual int initialize(); virtual srs_error_t initialize();
// When got a parsed audio packet. // When got a parsed audio packet.
// @param data The data in FLV format. // @param data The data in FLV format.
virtual int on_audio(int64_t timestamp, char* data, int size); virtual int on_audio(int64_t timestamp, char* data, int size);

View file

@ -3102,8 +3102,13 @@ SrsTsTransmuxer::~SrsTsTransmuxer()
int SrsTsTransmuxer::initialize(SrsFileWriter* fw) int SrsTsTransmuxer::initialize(SrsFileWriter* fw)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((err = format->initialize()) != srs_success) {
// TODO: FIXME: Use error.
ret = srs_error_code(err);
srs_freep(err);
if ((ret = format->initialize()) != ERROR_SUCCESS) {
return ret; return ret;
} }