1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for bug #251, refine the directly access ptrs for hls,dvr,forwarder,consumer.

This commit is contained in:
winlin 2014-12-05 23:49:53 +08:00
parent d3a103184a
commit de993b6465
8 changed files with 97 additions and 67 deletions

View file

@ -332,10 +332,12 @@ int SrsConsumer::get_time()
return jitter->get_time();
}
int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag)
int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag)
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* msg = __msg->copy();
if (!atc) {
if ((ret = jitter->correct(msg, tba, tbv, ag)) != ERROR_SUCCESS) {
srs_freep(msg);
@ -409,7 +411,7 @@ void SrsGopCache::set(bool enabled)
srs_info("enable gop cache");
}
int SrsGopCache::cache(SrsSharedPtrMessage* msg)
int SrsGopCache::cache(SrsSharedPtrMessage* __msg)
{
int ret = ERROR_SUCCESS;
@ -418,6 +420,9 @@ int SrsGopCache::cache(SrsSharedPtrMessage* msg)
return ret;
}
// the gop cache know when to gop it.
SrsSharedPtrMessage* msg = __msg;
// disable gop cache when not h.264
if (!SrsFlvCodec::video_is_h264(msg->payload, msg->size)) {
srs_info("gop donot cache video for none h.264");
@ -840,15 +845,15 @@ int SrsSource::on_forwarder_start(SrsForwarder* forwarder)
// feed the forwarder the metadata/sequence header,
// when reload to enable the forwarder.
if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("forwarder process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("forwarder process audio sequence header message failed. ret=%d", ret);
return ret;
}
@ -865,11 +870,11 @@ int SrsSource::on_hls_start()
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
// TODO: maybe need to decode the metadata?
if (cache_sh_video && (ret = hls->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
if (cache_sh_video && (ret = hls->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("hls process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("hls process audio sequence header message failed. ret=%d", ret);
return ret;
}
@ -908,11 +913,11 @@ int SrsSource::on_dvr_request_sh()
}
}
if (cache_sh_video && (ret = dvr->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
if (cache_sh_video && (ret = dvr->on_video(cache_sh_video)) != ERROR_SUCCESS) {
srs_error("dvr process video sequence header message failed. ret=%d", ret);
return ret;
}
if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio)) != ERROR_SUCCESS) {
srs_error("dvr process audio sequence header message failed. ret=%d", ret);
return ret;
}
@ -1048,8 +1053,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
SrsSharedPtrMessage* copy = cache_metadata->copy();
if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
if ((ret = consumer->enqueue(cache_metadata, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the metadata failed. ret=%d", ret);
return ret;
}
@ -1062,7 +1066,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
if ((ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
@ -1086,7 +1090,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
srs_verbose("initialize shared ptr audio success.");
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_audio(msg.copy())) != ERROR_SUCCESS) {
if ((ret = hls->on_audio(&msg)) != ERROR_SUCCESS) {
srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
// unpublish, ignore ret.
@ -1098,7 +1102,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_audio(msg.copy())) != ERROR_SUCCESS) {
if ((ret = dvr->on_audio(&msg)) != ERROR_SUCCESS) {
srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
@ -1115,8 +1119,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
SrsConsumer** pconsumer = consumers.data();
for (int i = 0; i < nb_consumers; i++) {
SrsConsumer* consumer = pconsumer[i];
SrsSharedPtrMessage* copy = msg.copy();
if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the audio failed. ret=%d", ret);
return ret;
}
@ -1197,7 +1200,7 @@ int SrsSource::on_video(SrsCommonMessage* __video)
srs_verbose("initialize shared ptr video success.");
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_video(msg.copy())) != ERROR_SUCCESS) {
if ((ret = hls->on_video(&msg)) != ERROR_SUCCESS) {
srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
// unpublish, ignore ret.
@ -1209,7 +1212,7 @@ int SrsSource::on_video(SrsCommonMessage* __video)
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_video(msg.copy())) != ERROR_SUCCESS) {
if ((ret = dvr->on_video(&msg)) != ERROR_SUCCESS) {
srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
@ -1224,8 +1227,7 @@ int SrsSource::on_video(SrsCommonMessage* __video)
if (true) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
SrsSharedPtrMessage* copy = msg.copy();
if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the video failed. ret=%d", ret);
return ret;
}
@ -1234,11 +1236,11 @@ int SrsSource::on_video(SrsCommonMessage* __video)
}
// copy to all forwarders.
if (true) {
if (!forwarders.empty()) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_video(msg.copy())) != ERROR_SUCCESS) {
if ((ret = forwarder->on_video(&msg)) != ERROR_SUCCESS) {
srs_error("forwarder process video message failed. ret=%d", ret);
return ret;
}
@ -1512,20 +1514,20 @@ void SrsSource::on_unpublish()
SrsRtmpJitterAlgorithm ag = jitter_algorithm;
// copy metadata.
if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), atc, tba, tbv, ag)) != ERROR_SUCCESS) {
if (cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
srs_error("dispatch metadata failed. ret=%d", ret);
return ret;
}
srs_info("dispatch metadata success");
// copy sequence header
if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), atc, tba, tbv, ag)) != ERROR_SUCCESS) {
if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch video sequence header success");
if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), atc, tba, tbv, ag)) != ERROR_SUCCESS) {
if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
srs_error("dispatch audio sequence header failed. ret=%d", ret);
return ret;
}