From 6a4b1774752dfc8da55d2de657267c07077f4cd1 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 6 Jul 2014 18:23:14 +0800 Subject: [PATCH] refine shared ptr message, rename initialize to create --- trunk/src/app/srs_app_edge.cpp | 10 ++-- trunk/src/app/srs_app_source.cpp | 65 +++++++++++----------- trunk/src/libs/srs_librtmp.cpp | 6 +- trunk/src/rtmp/srs_protocol_msg_array.hpp | 4 ++ trunk/src/rtmp/srs_protocol_rtmp_stack.cpp | 33 ++++++----- trunk/src/rtmp/srs_protocol_rtmp_stack.hpp | 32 ++++++++--- trunk/src/utest/srs_utest_protocol.cpp | 26 +++++++++ 7 files changed, 115 insertions(+), 61 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 292eb0065..611f199b1 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -504,17 +504,15 @@ int SrsEdgeForwarder::proxy(SrsMessage* msg) return ret; } - // TODO: FIXME: use utility to copy msg to shared ptr msg. - SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); - SrsAutoFree(SrsSharedPtrMessage, copy); - if ((ret = copy->initialize(msg)) != ERROR_SUCCESS) { + SrsSharedPtrMessage copy; + if ((ret = copy.create(msg)) != ERROR_SUCCESS) { srs_error("initialize the msg failed. ret=%d", ret); return ret; } srs_verbose("initialize shared ptr msg success."); - copy->header.stream_id = stream_id; - if ((ret = queue->enqueue(copy->copy())) != ERROR_SUCCESS) { + copy.header.stream_id = stream_id; + if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) { srs_error("enqueue edge publish msg failed. ret=%d", ret); } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 33c09f58e..ea9fd819b 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -972,7 +972,8 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata) cache_metadata = new SrsSharedPtrMessage(); // dump message to shared ptr message. - if ((ret = cache_metadata->initialize(&msg->header, payload, size)) != ERROR_SUCCESS) { + // the payload/size managed by cache_metadata, user should not free it. + if ((ret = cache_metadata->create(&msg->header, payload, size)) != ERROR_SUCCESS) { srs_error("initialize the cache metadata failed. ret=%d", ret); return ret; } @@ -1007,20 +1008,21 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata) return ret; } -int SrsSource::on_audio(SrsMessage* audio) +int SrsSource::on_audio(SrsMessage* __audio) { int ret = ERROR_SUCCESS; - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); - SrsAutoFree(SrsSharedPtrMessage, msg); - if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) { + // convert __audio to msg, user should not use __audio again. + // the payload is transfer to msg, and set to NULL in __audio. + SrsSharedPtrMessage msg; + if ((ret = msg.create(__audio)) != ERROR_SUCCESS) { srs_error("initialize the audio failed. ret=%d", ret); return ret; } srs_verbose("initialize shared ptr audio success."); #ifdef SRS_AUTO_HLS - if ((ret = hls->on_audio(msg->copy())) != ERROR_SUCCESS) { + if ((ret = hls->on_audio(msg.copy())) != ERROR_SUCCESS) { srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret); // unpublish, ignore ret. @@ -1032,7 +1034,7 @@ int SrsSource::on_audio(SrsMessage* audio) #endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->on_audio(msg->copy())) != ERROR_SUCCESS) { + if ((ret = dvr->on_audio(msg.copy())) != ERROR_SUCCESS) { srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1047,7 +1049,7 @@ int SrsSource::on_audio(SrsMessage* audio) if (true) { for (int i = 0; i < (int)consumers.size(); i++) { SrsConsumer* consumer = consumers.at(i); - SrsSharedPtrMessage* copy = msg->copy(); + SrsSharedPtrMessage* copy = msg.copy(); if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the audio failed. ret=%d", ret); return ret; @@ -1061,7 +1063,7 @@ int SrsSource::on_audio(SrsMessage* audio) std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) { + if ((ret = forwarder->on_audio(msg.copy())) != ERROR_SUCCESS) { srs_error("forwarder process audio message failed. ret=%d", ret); return ret; } @@ -1070,14 +1072,14 @@ int SrsSource::on_audio(SrsMessage* audio) // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. - if (SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) { + if (SrsFlvCodec::audio_is_sequence_header(msg.payload, msg.size)) { srs_freep(cache_sh_audio); - cache_sh_audio = msg->copy(); + cache_sh_audio = msg.copy(); // parse detail audio codec SrsAvcAacCodec codec; SrsCodecSample sample; - if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) { + if ((ret = codec.audio_aac_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) { srs_error("codec demux audio failed. ret=%d", ret); return ret; } @@ -1087,7 +1089,7 @@ int SrsSource::on_audio(SrsMessage* audio) srs_trace("%dB audio sh, " "codec(%d, profile=%d, %dchannels, %dkbps, %dHZ), " "flv(%dbits, %dchannels, %dHZ)", - msg->header.payload_length, codec.audio_codec_id, + msg.header.payload_length, codec.audio_codec_id, codec.aac_profile, codec.aac_channels, codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate], flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type], @@ -1096,7 +1098,7 @@ int SrsSource::on_audio(SrsMessage* audio) } // cache the last gop packets - if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { + if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) { srs_error("shrink gop cache failed. ret=%d", ret); return ret; } @@ -1105,30 +1107,31 @@ int SrsSource::on_audio(SrsMessage* audio) // if atc, update the sequence header to abs time. if (atc) { if (cache_sh_audio) { - cache_sh_audio->header.timestamp = msg->header.timestamp; + cache_sh_audio->header.timestamp = msg.header.timestamp; } if (cache_metadata) { - cache_metadata->header.timestamp = msg->header.timestamp; + cache_metadata->header.timestamp = msg.header.timestamp; } } return ret; } -int SrsSource::on_video(SrsMessage* video) +int SrsSource::on_video(SrsMessage* __video) { int ret = ERROR_SUCCESS; - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); - SrsAutoFree(SrsSharedPtrMessage, msg); - if ((ret = msg->initialize(video)) != ERROR_SUCCESS) { + // convert __video to msg, user should not use __video again. + // the payload is transfer to msg, and set to NULL in __video. + SrsSharedPtrMessage msg; + if ((ret = msg.create(__video)) != ERROR_SUCCESS) { srs_error("initialize the video failed. ret=%d", ret); return ret; } srs_verbose("initialize shared ptr video success."); #ifdef SRS_AUTO_HLS - if ((ret = hls->on_video(msg->copy())) != ERROR_SUCCESS) { + if ((ret = hls->on_video(msg.copy())) != ERROR_SUCCESS) { srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret); // unpublish, ignore ret. @@ -1140,7 +1143,7 @@ int SrsSource::on_video(SrsMessage* video) #endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->on_video(msg->copy())) != ERROR_SUCCESS) { + if ((ret = dvr->on_video(msg.copy())) != ERROR_SUCCESS) { srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1155,7 +1158,7 @@ int SrsSource::on_video(SrsMessage* video) if (true) { for (int i = 0; i < (int)consumers.size(); i++) { SrsConsumer* consumer = consumers.at(i); - SrsSharedPtrMessage* copy = msg->copy(); + SrsSharedPtrMessage* copy = msg.copy(); if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the video failed. ret=%d", ret); return ret; @@ -1169,7 +1172,7 @@ int SrsSource::on_video(SrsMessage* video) std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) { + if ((ret = forwarder->on_video(msg.copy())) != ERROR_SUCCESS) { srs_error("forwarder process video message failed. ret=%d", ret); return ret; } @@ -1178,28 +1181,28 @@ int SrsSource::on_video(SrsMessage* video) // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. - if (SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) { + if (SrsFlvCodec::video_is_sequence_header(msg.payload, msg.size)) { srs_freep(cache_sh_video); - cache_sh_video = msg->copy(); + cache_sh_video = msg.copy(); // parse detail audio codec SrsAvcAacCodec codec; SrsCodecSample sample; - if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) { + if ((ret = codec.video_avc_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) { srs_error("codec demux video failed. ret=%d", ret); return ret; } srs_trace("%dB video sh, " "codec(%d, profile=%d, level=%d, %dx%d, %dkbps, %dfps, %ds)", - msg->header.payload_length, codec.video_codec_id, + msg.header.payload_length, codec.video_codec_id, codec.avc_profile, codec.avc_level, codec.width, codec.height, codec.video_data_rate / 1000, codec.frame_rate, codec.duration); return ret; } // cache the last gop packets - if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { + if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) { srs_error("gop cache msg failed. ret=%d", ret); return ret; } @@ -1208,10 +1211,10 @@ int SrsSource::on_video(SrsMessage* video) // if atc, update the sequence header to abs time. if (atc) { if (cache_sh_video) { - cache_sh_video->header.timestamp = msg->header.timestamp; + cache_sh_video->header.timestamp = msg.header.timestamp; } if (cache_metadata) { - cache_metadata->header.timestamp = msg->header.timestamp; + cache_metadata->header.timestamp = msg.header.timestamp; } } diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 46ec07aef..30643d964 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -388,7 +388,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, header.initialize_audio(size, timestamp, context->stream_id); msg = new SrsSharedPtrMessage(); - if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { + if ((ret = msg->create(&header, data, size)) != ERROR_SUCCESS) { srs_freep(data); return ret; } @@ -397,7 +397,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, header.initialize_video(size, timestamp, context->stream_id); msg = new SrsSharedPtrMessage(); - if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { + if ((ret = msg->create(&header, data, size)) != ERROR_SUCCESS) { srs_freep(data); return ret; } @@ -406,7 +406,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, header.initialize_amf0_script(size, context->stream_id); msg = new SrsSharedPtrMessage(); - if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { + if ((ret = msg->create(&header, data, size)) != ERROR_SUCCESS) { srs_freep(data); return ret; } diff --git a/trunk/src/rtmp/srs_protocol_msg_array.hpp b/trunk/src/rtmp/srs_protocol_msg_array.hpp index 5366bdbe5..53057e089 100644 --- a/trunk/src/rtmp/srs_protocol_msg_array.hpp +++ b/trunk/src/rtmp/srs_protocol_msg_array.hpp @@ -34,6 +34,10 @@ class SrsSharedPtrMessage; /** * the class to auto free the shared ptr message array. +* when need to get some messages, for instance, from Consumer queue, +* create a message array, whose msgs can used to accept the msgs, +* then send each message and set to NULL. +* @remark: when error, the message array will free the msg not sent out. */ class SrsSharedPtrMessageArray { diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index 678d4c061..1fa380c10 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -1612,26 +1612,28 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage() } } -int SrsSharedPtrMessage::initialize(SrsMessage* source) +int SrsSharedPtrMessage::create(SrsMessage* msg) { int ret = ERROR_SUCCESS; - if ((ret = initialize(&source->header, (char*)source->payload, source->size)) != ERROR_SUCCESS) { + if ((ret = create(&msg->header, (char*)msg->payload, msg->size)) != ERROR_SUCCESS) { return ret; } - // detach the payload from source - source->payload = NULL; - source->size = 0; + // to prevent double free of payload: + // initialize already attach the payload of msg, + // detach the payload to transfer the owner to shared ptr. + msg->payload = NULL; + msg->size = 0; return ret; } -int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size) +int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size) { int ret = ERROR_SUCCESS; - srs_assert(source != NULL); + srs_assert(pheader != NULL); if (ptr) { ret = ERROR_SYSTEM_ASSERT_FAILED; srs_error("should not set the payload twice. ret=%d", ret); @@ -1640,28 +1642,31 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int return ret; } - header = *source; + header = *pheader; header.payload_length = size; ptr = new __SrsSharedPtr(); - // direct attach the data of common message. + // direct attach the data. ptr->payload = payload; ptr->size = size; + // message can access it. SrsMessage::payload = (int8_t*)ptr->payload; SrsMessage::size = ptr->size; return ret; } +int SrsSharedPtrMessage::count() +{ + srs_assert(ptr); + return ptr->shared_count; +} + SrsSharedPtrMessage* SrsSharedPtrMessage::copy() { - if (!ptr) { - srs_error("invoke initialize to initialize the ptr."); - srs_assert(false); - return NULL; - } + srs_assert(ptr); SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index d42448ea9..5d2a98eb5 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -370,6 +370,13 @@ public: * shared ptr message. * for audio/video/data message that need less memory copy. * and only for output. +* +* create first object by constructor and create(), +* use copy if need reference count message. +* +* Usage: +* SrsSharedPtrMessage msg; +* */ class SrsSharedPtrMessage : public SrsMessage { @@ -390,19 +397,30 @@ public: virtual ~SrsSharedPtrMessage(); public: /** - * set the shared payload. - * we will detach the payload of source, - * so ensure donot use it before. + * create shared ptr message, + * copy header, manage the payload of msg, + * set the payload to NULL to prevent double free. + * @remark payload of msg set to NULL if success. */ - virtual int initialize(SrsMessage* source); + virtual int create(SrsMessage* msg); /** - * set the shared payload. - * use source header, and specified param payload. + * create shared ptr message, + * from the header and payload. + * @remark user should never free the payload. */ - virtual int initialize(SrsMessageHeader* source, char* payload, int size); + virtual int create(SrsMessageHeader* pheader, char* payload, int size); + /** + * get current reference count. + * when this object created, count set to 0. + * if copy() this object, count increase 1. + * if this or copy deleted, free payload when count is 0, or count--. + * @remark, assert object is created. + */ + virtual int count(); public: /** * copy current shared ptr message, use ref-count. + * @remark, assert object is created. */ virtual SrsSharedPtrMessage* copy(); }; diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index e988d8594..309eab196 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -27,6 +27,8 @@ using namespace std; #include #include #include +#include +#include MockEmptyIO::MockEmptyIO() { @@ -389,3 +391,27 @@ VOID TEST(ProtocolUtilityTest, GenerateTcUrl) tcUrl = srs_generate_tc_url(ip, vhost, app, port); EXPECT_STREQ("rtmp://demo:19351/live", tcUrl.c_str()); } + +VOID TEST(ProtocolMsgArrayTest, MessageArray) +{ + SrsMessageHeader header; + SrsSharedPtrMessage msg; + char* payload = new char[1024]; + EXPECT_TRUE(ERROR_SUCCESS == msg.create(&header, payload, 1024)); + EXPECT_EQ(0, msg.count()); + + if (true) { + SrsSharedPtrMessageArray arr(3); + + arr.msgs[0] = msg.copy(); + EXPECT_EQ(1, msg.count()); + + arr.msgs[1] = msg.copy(); + EXPECT_EQ(2, msg.count()); + + arr.msgs[2] = msg.copy(); + EXPECT_EQ(3, msg.count()); + } + EXPECT_EQ(0, msg.count()); +} +