diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 001b14ff8..a206f23fa 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -455,7 +455,7 @@ int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) return ret; } -int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) +int SrsDvrPlan::on_audio(__SrsSharedPtrMessage* audio) { int ret = ERROR_SUCCESS; @@ -481,7 +481,7 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) return ret; } -int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) +int SrsDvrPlan::on_video(__SrsSharedPtrMessage* video) { int ret = ERROR_SUCCESS; @@ -573,7 +573,7 @@ int SrsDvrPlan::flv_close() return ret; } -int SrsDvrPlan::update_duration(SrsSharedPtrMessage* msg) +int SrsDvrPlan::update_duration(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -720,7 +720,7 @@ void SrsDvrSegmentPlan::on_unpublish() dvr_enabled = false; } -int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) +int SrsDvrSegmentPlan::update_duration(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -903,7 +903,7 @@ int SrsDvrHssPlan::on_dvr_reap_flv_header(string path) return ret; } -int SrsDvrHssPlan::update_duration(SrsSharedPtrMessage* msg) +int SrsDvrHssPlan::update_duration(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -998,11 +998,11 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) return ret; } -int SrsDvr::on_audio(SrsSharedPtrMessage* audio) +int SrsDvr::on_audio(__SrsSharedPtrMessage* audio) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, audio, false); + SrsAutoFree(__SrsSharedPtrMessage, audio, false); if ((ret = plan->on_audio(audio)) != ERROR_SUCCESS) { return ret; @@ -1011,11 +1011,11 @@ int SrsDvr::on_audio(SrsSharedPtrMessage* audio) return ret; } -int SrsDvr::on_video(SrsSharedPtrMessage* video) +int SrsDvr::on_video(__SrsSharedPtrMessage* video) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, video, false); + SrsAutoFree(__SrsSharedPtrMessage, video, false); if ((ret = plan->on_video(video)) != ERROR_SUCCESS) { return ret; diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index bc6b7f081..55db7ce3f 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -36,7 +36,7 @@ class SrsRequest; class SrsStream; class SrsRtmpJitter; class SrsOnMetaDataPacket; -class SrsSharedPtrMessage; +class __SrsSharedPtrMessage; /** * file stream to read/write file. @@ -185,13 +185,13 @@ public: virtual int on_publish(); virtual void on_unpublish() = 0; virtual int on_meta_data(SrsOnMetaDataPacket* metadata); - virtual int on_audio(SrsSharedPtrMessage* audio); - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_audio(__SrsSharedPtrMessage* audio); + virtual int on_video(__SrsSharedPtrMessage* video); protected: virtual int flv_open(std::string stream, std::string path); virtual int flv_close(); virtual int open_new_segment(); - virtual int update_duration(SrsSharedPtrMessage* msg); + virtual int update_duration(__SrsSharedPtrMessage* msg); virtual int write_flv_header(); virtual int on_dvr_request_sh(); virtual int on_video_keyframe(); @@ -233,7 +233,7 @@ public: virtual int on_publish(); virtual void on_unpublish(); private: - virtual int update_duration(SrsSharedPtrMessage* msg); + virtual int update_duration(__SrsSharedPtrMessage* msg); }; /** @@ -260,7 +260,7 @@ protected: virtual int64_t filter_timestamp(int64_t timestamp); private: virtual int on_dvr_reap_flv_header(std::string path); - virtual int update_duration(SrsSharedPtrMessage* msg); + virtual int update_duration(__SrsSharedPtrMessage* msg); }; /** @@ -300,11 +300,11 @@ public: /** * mux the audio packets to dvr. */ - virtual int on_audio(SrsSharedPtrMessage* audio); + virtual int on_audio(__SrsSharedPtrMessage* audio); /** * mux the video packets to dvr. */ - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_video(__SrsSharedPtrMessage* video); }; #endif diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index b38b58642..09bb3289b 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -175,15 +175,15 @@ int SrsEdgeIngester::ingest() } // read from client. - SrsCommonMessage* msg = NULL; - if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = client->__recv_message(&msg)) != ERROR_SUCCESS) { srs_error("ingest recv origin server message failed. ret=%d", ret); return ret; } srs_verbose("edge loop recv message. ret=%d", ret); srs_assert(msg); - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) { return ret; @@ -193,7 +193,7 @@ int SrsEdgeIngester::ingest() return ret; } -int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) +int SrsEdgeIngester::process_publish_message(__SrsMessage* msg) { int ret = ERROR_SUCCESS; @@ -217,12 +217,13 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { - if ((ret = msg->decode_packet(client->get_protocol())) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = client->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("decode onMetaData message failed. ret=%d", ret); return ret; } + SrsAutoFree(SrsPacket, pkt, false); - SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) { @@ -419,8 +420,8 @@ int SrsEdgeForwarder::cycle() // read from client. if (true) { - SrsCommonMessage* msg = NULL; - ret = client->recv_message(&msg); + __SrsMessage* msg = NULL; + ret = client->__recv_message(&msg); srs_verbose("edge loop recv message. ret=%d", ret); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { @@ -434,7 +435,7 @@ int SrsEdgeForwarder::cycle() // forward all messages. int count = 0; - SrsSharedPtrMessage** msgs = NULL; + __SrsSharedPtrMessage** msgs = NULL; if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) { srs_error("get message to forward to origin failed. ret=%d", ret); return ret; @@ -455,16 +456,16 @@ int SrsEdgeForwarder::cycle() srs_verbose("no packets to forward."); continue; } - SrsAutoFree(SrsSharedPtrMessage*, msgs, true); + SrsAutoFree(__SrsSharedPtrMessage*, msgs, true); // all msgs to forward. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + __SrsSharedPtrMessage* msg = msgs[i]; srs_assert(msg); msgs[i] = NULL; - if ((ret = client->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = client->__send_and_free_message(msg)) != ERROR_SUCCESS) { srs_error("edge publish forwarder send message to server failed. ret=%d", ret); return ret; } @@ -474,7 +475,7 @@ int SrsEdgeForwarder::cycle() return ret; } -int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) +int SrsEdgeForwarder::proxy(__SrsMessage* msg) { int ret = ERROR_SUCCESS; @@ -494,8 +495,8 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) } // TODO: FIXME: use utility to copy msg to shared ptr msg. - SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); - SrsAutoFree(SrsSharedPtrMessage, copy, false); + __SrsSharedPtrMessage* copy = new __SrsSharedPtrMessage(); + SrsAutoFree(__SrsSharedPtrMessage, copy, false); if ((ret = copy->initialize(msg)) != ERROR_SUCCESS) { srs_error("initialize the msg failed. ret=%d", ret); return ret; @@ -723,7 +724,7 @@ int SrsPublishEdge::on_client_publish() return ret; } -int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg) +int SrsPublishEdge::on_proxy_publish(__SrsMessage* msg) { return forwarder->proxy(msg); } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index d0e227e82..f4a872bd8 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -40,7 +40,7 @@ class SrsRequest; class SrsPlayEdge; class SrsPublishEdge; class SrsRtmpClient; -class SrsCommonMessage; +class __SrsMessage; class SrsMessageQueue; class ISrsProtocolReaderWriter; @@ -99,7 +99,7 @@ private: virtual int ingest(); virtual void close_underlayer_socket(); virtual int connect_server(); - virtual int process_publish_message(SrsCommonMessage* msg); + virtual int process_publish_message(__SrsMessage* msg); }; /** @@ -142,7 +142,7 @@ public: public: virtual int cycle(); public: - virtual int proxy(SrsCommonMessage* msg); + virtual int proxy(__SrsMessage* msg); private: virtual void close_underlayer_socket(); virtual int connect_server(); @@ -202,7 +202,7 @@ public: /** * proxy publish stream to edge */ - virtual int on_proxy_publish(SrsCommonMessage* msg); + virtual int on_proxy_publish(__SrsMessage* msg); /** * proxy unpublish stream to edge. */ diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 5ce651315..56cefb9aa 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -148,7 +148,7 @@ void SrsForwarder::on_unpublish() srs_freep(io); } -int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) +int SrsForwarder::on_meta_data(__SrsSharedPtrMessage* metadata) { int ret = ERROR_SUCCESS; @@ -164,7 +164,7 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) return ret; } -int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) +int SrsForwarder::on_audio(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -180,7 +180,7 @@ int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) return ret; } -int SrsForwarder::on_video(SrsSharedPtrMessage* msg) +int SrsForwarder::on_video(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -329,7 +329,7 @@ int SrsForwarder::forward() // forward all messages. int count = 0; - SrsSharedPtrMessage** msgs = NULL; + __SrsSharedPtrMessage** msgs = NULL; if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) { srs_error("get message to forward failed. ret=%d", ret); return ret; @@ -348,16 +348,16 @@ int SrsForwarder::forward() srs_verbose("no packets to forward."); continue; } - SrsAutoFree(SrsSharedPtrMessage*, msgs, true); + SrsAutoFree(__SrsSharedPtrMessage*, msgs, true); // all msgs to forward. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + __SrsSharedPtrMessage* msg = msgs[i]; srs_assert(msg); msgs[i] = NULL; - if ((ret = client->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = client->__send_and_free_message(msg)) != ERROR_SUCCESS) { srs_error("forwarder send message to server failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 6907022cd..d4fdaac8b 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -35,7 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class ISrsProtocolReaderWriter; -class SrsSharedPtrMessage; +class __SrsSharedPtrMessage; class SrsOnMetaDataPacket; class SrsMessageQueue; class SrsRtmpJitter; @@ -72,9 +72,9 @@ public: public: virtual int on_publish(SrsRequest* req, std::string forward_server); virtual void on_unpublish(); - virtual int on_meta_data(SrsSharedPtrMessage* metadata); - virtual int on_audio(SrsSharedPtrMessage* msg); - virtual int on_video(SrsSharedPtrMessage* msg); + virtual int on_meta_data(__SrsSharedPtrMessage* metadata); + virtual int on_audio(__SrsSharedPtrMessage* msg); + virtual int on_video(__SrsSharedPtrMessage* msg); // interface ISrsThreadHandler. public: virtual int cycle(); diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 2980462be..68ddabfd8 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -1379,11 +1379,11 @@ int SrsHls::on_meta_data(SrsAmf0Object* metadata) return ret; } -int SrsHls::on_audio(SrsSharedPtrMessage* audio) +int SrsHls::on_audio(__SrsSharedPtrMessage* audio) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, audio, false); + SrsAutoFree(__SrsSharedPtrMessage, audio, false); if (!hls_enabled) { return ret; @@ -1423,11 +1423,11 @@ int SrsHls::on_audio(SrsSharedPtrMessage* audio) return ret; } -int SrsHls::on_video(SrsSharedPtrMessage* video) +int SrsHls::on_video(__SrsSharedPtrMessage* video) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, video, false); + SrsAutoFree(__SrsSharedPtrMessage, video, false); if (!hls_enabled) { return ret; diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index 577abdedc..6b5ed3518 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -34,7 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -class SrsSharedPtrMessage; +class __SrsSharedPtrMessage; class SrsCodecSample; class SrsCodecBuffer; class SrsMpegtsFrame; @@ -314,11 +314,11 @@ public: /** * mux the audio packets to ts. */ - virtual int on_audio(SrsSharedPtrMessage* audio); + virtual int on_audio(__SrsSharedPtrMessage* audio); /** * mux the video packets to ts. */ - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_video(__SrsSharedPtrMessage* video); private: virtual void hls_mux(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index f524e5daf..91612304e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -472,8 +472,8 @@ int SrsRtmpConn::playing(SrsSource* source) // read from client. int ctl_msg_ret = ERROR_SUCCESS; if (true) { - SrsCommonMessage* msg = NULL; - ctl_msg_ret = ret = rtmp->recv_message(&msg); + __SrsMessage* msg = NULL; + ctl_msg_ret = ret = rtmp->__recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { @@ -491,7 +491,7 @@ int SrsRtmpConn::playing(SrsSource* source) } // get messages from consumer. - SrsSharedPtrMessage** msgs = NULL; + __SrsSharedPtrMessage** msgs = NULL; int count = 0; if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) { srs_error("get messages from consumer failed. ret=%d", ret); @@ -510,11 +510,11 @@ int SrsRtmpConn::playing(SrsSource* source) srs_verbose("no packets in queue."); continue; } - SrsAutoFree(SrsSharedPtrMessage*, msgs, true); + SrsAutoFree(__SrsSharedPtrMessage*, msgs, true); // sendout messages for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + __SrsSharedPtrMessage* msg = msgs[i]; // the send_message will free the msg, // so set the msgs[i] to NULL. @@ -530,7 +530,7 @@ int SrsRtmpConn::playing(SrsSource* source) duration += msg->header.timestamp - starttime; starttime = msg->header.timestamp; - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_message(msg)) != ERROR_SUCCESS) { srs_error("send message to client failed. ret=%d", ret); return ret; } @@ -573,14 +573,13 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) // switch to other st-threads. st_usleep(0); - SrsCommonMessage* msg = NULL; - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) { srs_error("fmle recv identify client message failed. ret=%d", ret); return ret; } - srs_assert(msg); - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); pithy_print.elapse(); @@ -594,12 +593,14 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) // process UnPublish event. if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { - if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("fmle decode unpublish message failed. ret=%d", ret); return ret; } + + SrsAutoFree(SrsPacket, pkt, false); - SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { @@ -647,15 +648,15 @@ int SrsRtmpConn::flash_publish(SrsSource* source) // switch to other st-threads. st_usleep(0); - SrsCommonMessage* msg = NULL; - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("flash recv identify client message failed. ret=%d", ret); } return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); pithy_print.elapse(); @@ -669,11 +670,14 @@ int SrsRtmpConn::flash_publish(SrsSource* source) // process UnPublish event. if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { - if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("flash decode unpublish message failed. ret=%d", ret); return ret; } + SrsAutoFree(SrsPacket, pkt, false); + // flash unpublish. // TODO: maybe need to support republish. srs_trace("flash flash publish finished."); @@ -690,7 +694,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source) return ret; } -int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge) +int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, bool vhost_is_edge) { int ret = ERROR_SUCCESS; @@ -720,12 +724,13 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { - if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("decode onMetaData message failed. ret=%d", ret); return ret; } + SrsAutoFree(SrsPacket, pkt, false); - SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) { @@ -743,7 +748,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms return ret; } -int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg) +int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, __SrsMessage* msg) { int ret = ERROR_SUCCESS; @@ -751,29 +756,32 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag srs_verbose("ignore all empty message."); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { srs_info("ignore all message except amf0/amf3 command."); return ret; } - if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = rtmp->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret); return ret; } srs_info("decode the amf0/amf3 command packet success."); + SrsAutoFree(SrsPacket, pkt, false); + // for jwplayer/flowplayer, which send close as pause message. // @see https://github.com/winlinvip/simple-rtmp-server/issues/6 - SrsCloseStreamPacket* close = dynamic_cast(msg->get_packet()); + SrsCloseStreamPacket* close = dynamic_cast(pkt); if (close) { ret = ERROR_CONTROL_RTMP_CLOSE; srs_trace("system control message: rtmp close stream. ret=%d", ret); return ret; } - SrsPausePacket* pause = dynamic_cast(msg->get_packet()); + SrsPausePacket* pause = dynamic_cast(pkt); if (!pause) { srs_info("ignore all amf0/amf3 command except pause."); return ret; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 802daf6fd..17c3c32af 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -40,7 +40,7 @@ class SrsResponse; class SrsSource; class SrsRefer; class SrsConsumer; -class SrsCommonMessage; +class __SrsMessage; class SrsSocket; #ifdef SRS_AUTO_HTTP_CALLBACK class SrsHttpHooks; @@ -80,8 +80,8 @@ private: virtual int playing(SrsSource* source); virtual int fmle_publish(SrsSource* source); virtual int flash_publish(SrsSource* source); - virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); - virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); + virtual int process_publish_message(SrsSource* source, __SrsMessage* msg, bool vhost_is_edge); + virtual int process_play_control_msg(SrsConsumer* consumer, __SrsMessage* msg); private: virtual int http_hooks_on_connect(); virtual void http_hooks_on_close(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 8eb20ec79..83c8e4d6d 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -52,7 +52,7 @@ SrsRtmpJitter::~SrsRtmpJitter() { } -int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv) +int SrsRtmpJitter::correct(__SrsSharedPtrMessage* msg, int tba, int tbv) { int ret = ERROR_SUCCESS; @@ -130,7 +130,7 @@ void SrsMessageQueue::set_queue_size(double queue_size) queue_size_ms = (int)(queue_size * 1000); } -int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) +int SrsMessageQueue::enqueue(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -151,7 +151,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) return ret; } -int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) +int SrsMessageQueue::get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count) { int ret = ERROR_SUCCESS; @@ -169,13 +169,13 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in return ret; } - pmsgs = new SrsSharedPtrMessage*[count]; + pmsgs = new __SrsSharedPtrMessage*[count]; for (int i = 0; i < count; i++) { pmsgs[i] = msgs[i]; } - SrsSharedPtrMessage* last = msgs[count - 1]; + __SrsSharedPtrMessage* last = msgs[count - 1]; av_start_time = last->header.timestamp; if (count == (int)msgs.size()) { @@ -196,7 +196,7 @@ void SrsMessageQueue::shrink() // for when we shrinked, the first is the iframe, // we will directly remove the gop next time. for (int i = 1; i < (int)msgs.size(); i++) { - SrsSharedPtrMessage* msg = msgs[i]; + __SrsSharedPtrMessage* msg = msgs[i]; if (msg->header.is_video()) { if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) { @@ -222,7 +222,7 @@ void SrsMessageQueue::shrink() // remove the first gop from the front for (int i = 0; i < iframe_index; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + __SrsSharedPtrMessage* msg = msgs[i]; srs_freep(msg); } msgs.erase(msgs.begin(), msgs.begin() + iframe_index); @@ -230,10 +230,10 @@ void SrsMessageQueue::shrink() void SrsMessageQueue::clear() { - std::vector::iterator it; + std::vector<__SrsSharedPtrMessage*>::iterator it; for (it = msgs.begin(); it != msgs.end(); ++it) { - SrsSharedPtrMessage* msg = *it; + __SrsSharedPtrMessage* msg = *it; srs_freep(msg); } @@ -267,7 +267,7 @@ int SrsConsumer::get_time() return jitter->get_time(); } -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) +int SrsConsumer::enqueue(__SrsSharedPtrMessage* msg, int tba, int tbv) { int ret = ERROR_SUCCESS; @@ -285,7 +285,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) return ret; } -int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) +int SrsConsumer::get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count) { // paused, return nothing. if (paused) { @@ -329,7 +329,7 @@ void SrsGopCache::set(bool enabled) srs_info("enable gop cache"); } -int SrsGopCache::cache(SrsSharedPtrMessage* msg) +int SrsGopCache::cache(__SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -368,9 +368,9 @@ int SrsGopCache::cache(SrsSharedPtrMessage* msg) void SrsGopCache::clear() { - std::vector::iterator it; + std::vector<__SrsSharedPtrMessage*>::iterator it; for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { - SrsSharedPtrMessage* msg = *it; + __SrsSharedPtrMessage* msg = *it; srs_freep(msg); } gop_cache.clear(); @@ -382,9 +382,9 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) { int ret = ERROR_SUCCESS; - std::vector::iterator it; + std::vector<__SrsSharedPtrMessage*>::iterator it; for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { - SrsSharedPtrMessage* msg = *it; + __SrsSharedPtrMessage* msg = *it; if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) { srs_error("dispatch cached gop failed. ret=%d", ret); return ret; @@ -406,7 +406,7 @@ int64_t SrsGopCache::get_start_time() return 0; } - SrsSharedPtrMessage* msg = gop_cache[0]; + __SrsSharedPtrMessage* msg = gop_cache[0]; srs_assert(msg); return msg->header.timestamp; @@ -789,7 +789,7 @@ bool SrsSource::can_publish() return _can_publish; } -int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) +int SrsSource::on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata) { int ret = ERROR_SUCCESS; @@ -840,7 +840,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata // create a shared ptr message. srs_freep(cache_metadata); - cache_metadata = new SrsSharedPtrMessage(); + cache_metadata = new __SrsSharedPtrMessage(); // dump message to shared ptr message. if ((ret = cache_metadata->initialize(&msg->header, payload, size)) != ERROR_SUCCESS) { @@ -877,12 +877,12 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata return ret; } -int SrsSource::on_audio(SrsCommonMessage* audio) +int SrsSource::on_audio(__SrsMessage* audio) { int ret = ERROR_SUCCESS; - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); - SrsAutoFree(SrsSharedPtrMessage, msg, false); + __SrsSharedPtrMessage* msg = new __SrsSharedPtrMessage(); + SrsAutoFree(__SrsSharedPtrMessage, msg, false); if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) { srs_error("initialize the audio failed. ret=%d", ret); return ret; @@ -966,12 +966,12 @@ int SrsSource::on_audio(SrsCommonMessage* audio) return ret; } -int SrsSource::on_video(SrsCommonMessage* video) +int SrsSource::on_video(__SrsMessage* video) { int ret = ERROR_SUCCESS; - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); - SrsAutoFree(SrsSharedPtrMessage, msg, false); + __SrsSharedPtrMessage* msg = new __SrsSharedPtrMessage(); + SrsAutoFree(__SrsSharedPtrMessage, msg, false); if ((ret = msg->initialize(video)) != ERROR_SUCCESS) { srs_error("initialize the video failed. ret=%d", ret); return ret; @@ -1207,7 +1207,7 @@ int SrsSource::on_edge_start_publish() return publish_edge->on_client_publish(); } -int SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg) +int SrsSource::on_edge_proxy_publish(__SrsMessage* msg) { return publish_edge->on_proxy_publish(msg); } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 25badf2a2..aec2765be 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -40,9 +40,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsPlayEdge; class SrsPublishEdge; class SrsSource; -class SrsCommonMessage; +class __SrsMessage; class SrsOnMetaDataPacket; -class SrsSharedPtrMessage; +class __SrsSharedPtrMessage; class SrsForwarder; class SrsRequest; class SrsSocket; @@ -74,7 +74,7 @@ public: /** * detect the time jitter and correct it. */ - virtual int correct(SrsSharedPtrMessage* msg, int tba, int tbv); + virtual int correct(__SrsSharedPtrMessage* msg, int tba, int tbv); /** * get current client time, the last packet time. */ @@ -91,7 +91,7 @@ private: int64_t av_start_time; int64_t av_end_time; int queue_size_ms; - std::vector msgs; + std::vector<__SrsSharedPtrMessage*> msgs; public: SrsMessageQueue(); virtual ~SrsMessageQueue(); @@ -106,14 +106,14 @@ public: * enqueue the message, the timestamp always monotonically. * @param msg, the msg to enqueue, user never free it whatever the return code. */ - virtual int enqueue(SrsSharedPtrMessage* msg); + virtual int enqueue(__SrsSharedPtrMessage* msg); /** * get packets in consumer queue. * @pmsgs SrsMessages*[], output the prt array. * @count the count in array. * @max_count the max count to dequeue, 0 to dequeue all. */ - virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); + virtual int get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count); private: /** * remove a gop from the front. @@ -150,14 +150,14 @@ public: * @param tbv timebase of video. * used to calc the video time delta if time-jitter detected. */ - virtual int enqueue(SrsSharedPtrMessage* msg, int tba, int tbv); + virtual int enqueue(__SrsSharedPtrMessage* msg, int tba, int tbv); /** * get packets in consumer queue. * @pmsgs SrsMessages*[], output the prt array. * @count the count in array. * @max_count the max count to dequeue, 0 to dequeue all. */ - virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); + virtual int get_packets(int max_count, __SrsSharedPtrMessage**& pmsgs, int& count); /** * when client send the pause message. */ @@ -185,7 +185,7 @@ private: /** * cached gop. */ - std::vector gop_cache; + std::vector<__SrsSharedPtrMessage*> gop_cache; public: SrsGopCache(); virtual ~SrsGopCache(); @@ -196,7 +196,7 @@ public: * 1. cache the gop when got h264 video packet. * 2. clear gop when got keyframe. */ - virtual int cache(SrsSharedPtrMessage* msg); + virtual int cache(__SrsSharedPtrMessage* msg); virtual void clear(); virtual int dump(SrsConsumer* consumer, int tba, int tbv); /** @@ -267,11 +267,11 @@ private: // TODO: FIXME: to support reload atc. bool atc; private: - SrsSharedPtrMessage* cache_metadata; + __SrsSharedPtrMessage* cache_metadata; // the cached video sequence header. - SrsSharedPtrMessage* cache_sh_video; + __SrsSharedPtrMessage* cache_sh_video; // the cached audio sequence header. - SrsSharedPtrMessage* cache_sh_audio; + __SrsSharedPtrMessage* cache_sh_audio; public: /** * @param _req the client request object, @@ -299,9 +299,9 @@ public: virtual int on_dvr_request_sh(); public: virtual bool can_publish(); - virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); - virtual int on_audio(SrsCommonMessage* audio); - virtual int on_video(SrsCommonMessage* video); + virtual int on_meta_data(__SrsMessage* msg, SrsOnMetaDataPacket* metadata); + virtual int on_audio(__SrsMessage* audio); + virtual int on_video(__SrsMessage* video); /** * publish stream event notify. * @param _req the request from client, the source will deep copy it, @@ -322,7 +322,7 @@ public: // for edge, when publish edge stream, check the state virtual int on_edge_start_publish(); // for edge, proxy the publish - virtual int on_edge_proxy_publish(SrsCommonMessage* msg); + virtual int on_edge_proxy_publish(__SrsMessage* msg); // for edge, proxy stop publish virtual void on_edge_proxy_unpublish(); private: diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 6e8fbf4b1..7c3d6418c 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "79" +#define VERSION_REVISION "80" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs" diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 982cd4df6..4ebddaa0c 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -382,6 +382,26 @@ int SrsRtmpClient::send_message(ISrsMessage* msg) return protocol->send_message(msg); } +int SrsRtmpClient::__recv_message(__SrsMessage** pmsg) +{ + return protocol->__recv_message(pmsg); +} + +int SrsRtmpClient::__decode_message(__SrsMessage* msg, SrsPacket** ppacket) +{ + return protocol->__decode_message(msg, ppacket); +} + +int SrsRtmpClient::__send_and_free_message(__SrsMessage* msg) +{ + return protocol->__send_and_free_message(msg); +} + +int SrsRtmpClient::__send_and_free_packet(SrsPacket* packet, int stream_id) +{ + return protocol->__send_and_free_packet(packet, stream_id); +} + int SrsRtmpClient::handshake() { int ret = ERROR_SUCCESS; @@ -768,6 +788,26 @@ int SrsRtmpServer::send_message(ISrsMessage* msg) return protocol->send_message(msg); } +int SrsRtmpServer::__recv_message(__SrsMessage** pmsg) +{ + return protocol->__recv_message(pmsg); +} + +int SrsRtmpServer::__decode_message(__SrsMessage* msg, SrsPacket** ppacket) +{ + return protocol->__decode_message(msg, ppacket); +} + +int SrsRtmpServer::__send_and_free_message(__SrsMessage* msg) +{ + return protocol->__send_and_free_message(msg); +} + +int SrsRtmpServer::__send_and_free_packet(SrsPacket* packet, int stream_id) +{ + return protocol->__send_and_free_packet(packet, stream_id); +} + int SrsRtmpServer::handshake() { int ret = ERROR_SUCCESS; @@ -794,13 +834,13 @@ int SrsRtmpServer::connect_app(SrsRequest* req) { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsConnectAppPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect connect app message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); srs_info("get connect app message"); SrsAmf0Any* prop = NULL; @@ -833,13 +873,9 @@ int SrsRtmpServer::set_window_ack_size(int ack_size) { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket(); - pkt->ackowledgement_window_size = ack_size; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send ack size message failed. ret=%d", ret); return ret; } @@ -852,14 +888,10 @@ int SrsRtmpServer::set_peer_bandwidth(int bandwidth, int type) { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket(); - pkt->bandwidth = bandwidth; pkt->type = type; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send set bandwidth message failed. ret=%d", ret); return ret; } @@ -873,7 +905,6 @@ int SrsRtmpServer::response_connect_app(SrsRequest *req, const char* server_ip) { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage(); SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket(); pkt->props->set("fmsVer", SrsAmf0Any::str("FMS/"RTMP_SIG_FMS_VER)); @@ -903,9 +934,7 @@ int SrsRtmpServer::response_connect_app(SrsRequest *req, const char* server_ip) data->set("srs_server_ip", SrsAmf0Any::str(server_ip)); } - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send connect app response message failed. ret=%d", ret); return ret; } @@ -939,12 +968,8 @@ int SrsRtmpServer::on_bw_done() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket(); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send onBWDone message failed. ret=%d", ret); return ret; } @@ -959,13 +984,13 @@ int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& int ret = ERROR_SUCCESS; while (true) { - SrsCommonMessage* msg = NULL; - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = protocol->__recv_message(&msg)) != ERROR_SUCCESS) { srs_error("recv identify client message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { srs_trace("identify ignore messages except " @@ -973,12 +998,14 @@ int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& continue; } - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = protocol->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("identify decode message failed. ret=%d", ret); return ret; } - SrsPacket* pkt = msg->get_packet(); + SrsAutoFree(SrsPacket, pkt, false); + if (dynamic_cast(pkt)) { srs_info("identify client by create stream, play or flash publish."); return identify_create_stream_client(dynamic_cast(pkt), stream_id, type, stream_name, duration); @@ -1002,13 +1029,9 @@ int SrsRtmpServer::set_chunk_size(int chunk_size) { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); - pkt->chunk_size = chunk_size; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send set chunk size message failed. ret=%d", ret); return ret; } @@ -1023,14 +1046,10 @@ int SrsRtmpServer::start_play(int stream_id) // StreamBegin if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsUserControlPacket* pkt = new SrsUserControlPacket(); - pkt->event_type = SrcPCUCStreamBegin; pkt->event_data = stream_id; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret); return ret; } @@ -1039,7 +1058,6 @@ int SrsRtmpServer::start_play(int stream_id) // onStatus(NetStream.Play.Reset) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); @@ -1048,9 +1066,7 @@ int SrsRtmpServer::start_play(int stream_id) pkt->data->set(StatusDetails, SrsAmf0Any::str("stream")); pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID)); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret); return ret; } @@ -1059,7 +1075,6 @@ int SrsRtmpServer::start_play(int stream_id) // onStatus(NetStream.Play.Start) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); @@ -1068,9 +1083,7 @@ int SrsRtmpServer::start_play(int stream_id) pkt->data->set(StatusDetails, SrsAmf0Any::str("stream")); pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID)); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret); return ret; } @@ -1079,12 +1092,8 @@ int SrsRtmpServer::start_play(int stream_id) // |RtmpSampleAccess(false, false) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket(); - - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret); return ret; } @@ -1093,14 +1102,9 @@ int SrsRtmpServer::start_play(int stream_id) // onStatus(NetStream.Data.Start) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket(); - pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeDataStart)); - - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret); return ret; } @@ -1119,16 +1123,13 @@ int SrsRtmpServer::on_play_client_pause(int stream_id, bool is_pause) if (is_pause) { // onStatus(NetStream.Pause.Notify) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamPause)); pkt->data->set(StatusDescription, SrsAmf0Any::str("Paused stream.")); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret); return ret; } @@ -1136,14 +1137,12 @@ int SrsRtmpServer::on_play_client_pause(int stream_id, bool is_pause) } // StreamEOF if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsUserControlPacket* pkt = new SrsUserControlPacket(); pkt->event_type = SrcPCUCStreamEOF; pkt->event_data = stream_id; - msg->set_packet(pkt, 0); - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret); return ret; } @@ -1152,16 +1151,13 @@ int SrsRtmpServer::on_play_client_pause(int stream_id, bool is_pause) } else { // onStatus(NetStream.Unpause.Notify) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamUnpause)); pkt->data->set(StatusDescription, SrsAmf0Any::str("Unpaused stream.")); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret); return ret; } @@ -1169,14 +1165,12 @@ int SrsRtmpServer::on_play_client_pause(int stream_id, bool is_pause) } // StreanBegin if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsUserControlPacket* pkt = new SrsUserControlPacket(); pkt->event_type = SrcPCUCStreamBegin; pkt->event_data = stream_id; - msg->set_packet(pkt, 0); - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret); return ret; } @@ -1194,25 +1188,21 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) // FCPublish double fc_publish_tid = 0; if (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsFMLEStartPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("recv FCPublish message failed. ret=%d", ret); return ret; } srs_info("recv FCPublish request message success."); - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); fc_publish_tid = pkt->transaction_id; } // FCPublish response if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send FCPublish response message failed. ret=%d", ret); return ret; } @@ -1222,25 +1212,21 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) // createStream double create_stream_tid = 0; if (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsCreateStreamPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("recv createStream message failed. ret=%d", ret); return ret; } srs_info("recv createStream request message success."); - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); create_stream_tid = pkt->transaction_id; } // createStream response if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send createStream response message failed. ret=%d", ret); return ret; } @@ -1249,28 +1235,25 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) // publish if (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsPublishPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("recv publish message failed. ret=%d", ret); return ret; } srs_info("recv publish request message success."); - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); } // publish response onFCPublish(NetStream.Publish.Start) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH; pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart)); pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream.")); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret); return ret; } @@ -1278,7 +1261,6 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) } // publish response onStatus(NetStream.Publish.Start) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); @@ -1286,9 +1268,7 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream.")); pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID)); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret); return ret; } @@ -1306,16 +1286,13 @@ int SrsRtmpServer::fmle_unpublish(int stream_id, double unpublish_tid) // publish response onFCUnpublish(NetStream.unpublish.Success) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH; pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeUnpublishSuccess)); pkt->data->set(StatusDescription, SrsAmf0Any::str("Stop publishing stream.")); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret); return ret; } @@ -1323,12 +1300,8 @@ int SrsRtmpServer::fmle_unpublish(int stream_id, double unpublish_tid) } // FCUnpublish response if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid); - - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send FCUnpublish response message failed. ret=%d", ret); return ret; } @@ -1336,7 +1309,6 @@ int SrsRtmpServer::fmle_unpublish(int stream_id, double unpublish_tid) } // publish response onStatus(NetStream.Unpublish.Success) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); @@ -1344,9 +1316,7 @@ int SrsRtmpServer::fmle_unpublish(int stream_id, double unpublish_tid) pkt->data->set(StatusDescription, SrsAmf0Any::str("Stream is now unpublished")); pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID)); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret); return ret; } @@ -1364,7 +1334,6 @@ int SrsRtmpServer::start_flash_publish(int stream_id) // publish response onStatus(NetStream.Publish.Start) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus)); @@ -1372,9 +1341,7 @@ int SrsRtmpServer::start_flash_publish(int stream_id) pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream.")); pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID)); - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret); return ret; } @@ -1391,12 +1358,8 @@ int SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int int ret = ERROR_SUCCESS; if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send createStream response message failed. ret=%d", ret); return ret; } @@ -1404,13 +1367,13 @@ int SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int } while (true) { - SrsCommonMessage* msg = NULL; - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = protocol->__recv_message(&msg)) != ERROR_SUCCESS) { srs_error("recv identify client message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { srs_trace("identify ignore messages except " @@ -1418,12 +1381,14 @@ int SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int continue; } - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { + SrsPacket* pkt = NULL; + if ((ret = protocol->__decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("identify decode message failed. ret=%d", ret); return ret; } + + SrsAutoFree(SrsPacket, pkt, false); - SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { srs_info("level1 identify client by play."); return identify_play_client(dynamic_cast(pkt), type, stream_name, duration); @@ -1448,12 +1413,8 @@ int SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmp // releaseStream response if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send releaseStream response message failed. ret=%d", ret); return ret; } diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index d0079fdda..3a1d9b558 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -42,6 +42,8 @@ class SrsPublishPacket; class SrsSharedPtrMessage; class SrsOnMetaDataPacket; class SrsPlayPacket; +class __SrsMessage; +class SrsPacket; /** * the original request from client. @@ -164,6 +166,10 @@ public: virtual int get_send_kbps(); virtual int recv_message(SrsCommonMessage** pmsg); virtual int send_message(ISrsMessage* msg); + virtual int __recv_message(__SrsMessage** pmsg); + virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket); + virtual int __send_and_free_message(__SrsMessage* msg); + virtual int __send_and_free_packet(SrsPacket* packet, int stream_id); public: // try complex, then simple handshake. virtual int handshake(); @@ -209,6 +215,10 @@ public: virtual int get_send_kbps(); virtual int recv_message(SrsCommonMessage** pmsg); virtual int send_message(ISrsMessage* msg); + virtual int __recv_message(__SrsMessage** pmsg); + virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket); + virtual int __send_and_free_message(__SrsMessage* msg); + virtual int __send_and_free_packet(SrsPacket* packet, int stream_id); public: virtual int handshake(); virtual int connect_app(SrsRequest* req); diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index 9e4213d44..1e7859865 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -297,6 +297,7 @@ SrsProtocol::AckWindowSize::AckWindowSize() SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) { buffer = new SrsBuffer(); + decode_stream = new SrsStream(); skt = io; in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE; @@ -304,15 +305,28 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) SrsProtocol::~SrsProtocol() { - std::map::iterator it; + if (true) { + std::map::iterator it; + + for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) { + SrsChunkStream* stream = it->second; + srs_freep(stream); + } - for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) { - SrsChunkStream* stream = it->second; - srs_freep(stream); + chunk_streams.clear(); } - - chunk_streams.clear(); + if (true) { + std::map::iterator it; + + for (it = __chunk_streams.begin(); it != __chunk_streams.end(); ++it) { + __SrsChunkStream* stream = it->second; + srs_freep(stream); + } + __chunk_streams.clear(); + } + + srs_freep(decode_stream); srs_freep(buffer); } @@ -365,6 +379,982 @@ int SrsProtocol::get_send_kbps() return skt->get_send_kbps(); } +int SrsProtocol::__recv_message(__SrsMessage** pmsg) +{ + *pmsg = NULL; + + int ret = ERROR_SUCCESS; + + while (true) { + __SrsMessage* msg = NULL; + + if ((ret = __recv_interlaced_message(&msg)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("recv interlaced message failed. ret=%d", ret); + } + return ret; + } + srs_verbose("entire msg received"); + + if (!msg) { + continue; + } + + if (msg->size <= 0 || msg->header.payload_length <= 0) { + srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).", + msg->header.message_type, msg->header.payload_length, + msg->header.timestamp, msg->header.stream_id); + srs_freep(msg); + continue; + } + + if ((ret = __on_recv_message(msg)) != ERROR_SUCCESS) { + srs_error("hook the received msg failed. ret=%d", ret); + srs_freep(msg); + return ret; + } + + srs_verbose("get a msg with raw/undecoded payload"); + *pmsg = msg; + break; + } + + return ret; +} + +int SrsProtocol::__decode_message(__SrsMessage* msg, SrsPacket** ppacket) +{ + *ppacket = NULL; + + int ret = ERROR_SUCCESS; + + srs_assert(msg != NULL); + srs_assert(msg->payload != NULL); + srs_assert(msg->size > 0); + + // initialize the decode stream for all message, + // it's ok for the initialize if fast and without memory copy. + if ((ret = decode_stream->initialize((char*)(msg->payload), msg->size)) != ERROR_SUCCESS) { + srs_error("initialize stream failed. ret=%d", ret); + return ret; + } + srs_verbose("decode stream initialized success"); + + // decode the packet. + SrsPacket* packet = NULL; + if ((ret = __do_decode_message(msg->header, decode_stream, &packet)) != ERROR_SUCCESS) { + srs_freep(packet); + return ret; + } + + // set to output ppacket only when success. + *ppacket = packet; + + return ret; +} + +int SrsProtocol::__do_send_and_free_message(__SrsMessage* msg, SrsPacket* packet) +{ + int ret = ERROR_SUCCESS; + + // we donot use the complex basic header, + // ensure the basic header is 1bytes. + if (msg->header.perfer_cid < 2) { + srs_warn("change the chunk_id=%d to default=%d", + msg->header.perfer_cid, RTMP_CID_ProtocolControl); + msg->header.perfer_cid = RTMP_CID_ProtocolControl; + } + + // p set to current write position, + // it's ok when payload is NULL and size is 0. + char* p = (char*)msg->payload; + + // always write the header event payload is empty. + do { + // generate the header. + char* pheader = out_header_cache; + + if (p == (char*)msg->payload) { + // write new chunk stream header, fmt is 0 + *pheader++ = 0x00 | (msg->header.perfer_cid & 0x3F); + + // chunk message header, 11 bytes + // timestamp, 3bytes, big-endian + u_int32_t timestamp = (u_int32_t)msg->header.timestamp; + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { + *pheader++ = 0xFF; + *pheader++ = 0xFF; + *pheader++ = 0xFF; + } else { + pp = (char*)×tamp; + *pheader++ = pp[2]; + *pheader++ = pp[1]; + *pheader++ = pp[0]; + } + + // message_length, 3bytes, big-endian + pp = (char*)&msg->header.payload_length; + *pheader++ = pp[2]; + *pheader++ = pp[1]; + *pheader++ = pp[0]; + + // message_type, 1bytes + *pheader++ = msg->header.message_type; + + // message_length, 3bytes, little-endian + pp = (char*)&msg->header.stream_id; + *pheader++ = pp[0]; + *pheader++ = pp[1]; + *pheader++ = pp[2]; + *pheader++ = pp[3]; + + // chunk extended timestamp header, 0 or 4 bytes, big-endian + if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ + pp = (char*)×tamp; + *pheader++ = pp[3]; + *pheader++ = pp[2]; + *pheader++ = pp[1]; + *pheader++ = pp[0]; + } + } else { + // write no message header chunk stream, fmt is 3 + *pheader++ = 0xC0 | (msg->header.perfer_cid & 0x3F); + + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // 6.1.3. Extended Timestamp + // This field is transmitted only when the normal time stamp in the + // chunk message header is set to 0x00ffffff. If normal time stamp is + // set to any value less than 0x00ffffff, this field MUST NOT be + // present. This field MUST NOT be present if the timestamp field is not + // present. Type 3 chunks MUST NOT have this field. + // adobe changed for Type3 chunk: + // FMLE always sendout the extended-timestamp, + // must send the extended-timestamp to FMS, + // must send the extended-timestamp to flash-player. + // @see: ngx_rtmp_prepare_message + // @see: http://blog.csdn.net/win_lin/article/details/13363699 + u_int32_t timestamp = (u_int32_t)msg->header.timestamp; + if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ + pp = (char*)×tamp; + *pheader++ = pp[3]; + *pheader++ = pp[2]; + *pheader++ = pp[1]; + *pheader++ = pp[0]; + } + } + + // sendout header and payload by writev. + // decrease the sys invoke count to get higher performance. + int payload_size = msg->size - (p - (char*)msg->payload); + payload_size = srs_min(payload_size, out_chunk_size); + + // always has header + int header_size = pheader - out_header_cache; + srs_assert(header_size > 0); + + // send by writev + iovec iov[2]; + iov[0].iov_base = out_header_cache; + iov[0].iov_len = header_size; + iov[1].iov_base = p; + iov[1].iov_len = payload_size; + + ssize_t nwrite; + if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) { + srs_error("send with writev failed. ret=%d", ret); + return ret; + } + + // consume sendout bytes when not empty packet. + if (msg->payload && msg->size > 0) { + p += payload_size; + } + } while (p < (char*)msg->payload + msg->size); + + if ((ret = __on_send_message(msg, packet)) != ERROR_SUCCESS) { + srs_error("hook the send message failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsProtocol::__do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) +{ + int ret = ERROR_SUCCESS; + + SrsPacket* packet = NULL; + + // decode specified packet type + if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) { + srs_verbose("start to decode AMF0/AMF3 command message."); + + // skip 1bytes to decode the amf3 command. + if (header.is_amf3_command() && stream->require(1)) { + srs_verbose("skip 1bytes to decode AMF3 command"); + stream->skip(1); + } + + // amf0 command message. + // need to read the command name. + std::string command; + if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { + srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret); + return ret; + } + srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str()); + + // result/error packet + if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) { + double transactionId = 0.0; + if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) { + srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret); + return ret; + } + srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId); + + // reset stream, for header read completed. + stream->reset(); + if (header.is_amf3_command()) { + stream->skip(1); + } + + std::string request_name = get_request_name(transactionId); + if (request_name.empty()) { + ret = ERROR_RTMP_NO_REQUEST; + srs_error("decode AMF0/AMF3 request failed. ret=%d", ret); + return ret; + } + srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str()); + + if (request_name == RTMP_AMF0_COMMAND_CONNECT) { + srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); + *ppacket = packet = new SrsConnectAppResPacket(); + return packet->decode(stream); + } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) { + srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); + *ppacket = packet = new SrsCreateStreamResPacket(0, 0); + return packet->decode(stream); + } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM + || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH + || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) { + srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); + *ppacket = packet = new SrsFMLEStartResPacket(0); + return packet->decode(stream); + } else { + ret = ERROR_RTMP_NO_REQUEST; + srs_error("decode AMF0/AMF3 request failed. " + "request_name=%s, transactionId=%.2f, ret=%d", + request_name.c_str(), transactionId, ret); + return ret; + } + } + + // reset to zero(amf3 to 1) to restart decode. + stream->reset(); + if (header.is_amf3_command()) { + stream->skip(1); + } + + // decode command object. + if (command == RTMP_AMF0_COMMAND_CONNECT) { + srs_info("decode the AMF0/AMF3 command(connect vhost/app message)."); + *ppacket = packet = new SrsConnectAppPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_CREATE_STREAM) { + srs_info("decode the AMF0/AMF3 command(createStream message)."); + *ppacket = packet = new SrsCreateStreamPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_PLAY) { + srs_info("decode the AMF0/AMF3 command(paly message)."); + *ppacket = packet = new SrsPlayPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_PAUSE) { + srs_info("decode the AMF0/AMF3 command(pause message)."); + *ppacket = packet = new SrsPausePacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) { + srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message)."); + *ppacket = packet = new SrsFMLEStartPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) { + srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message)."); + *ppacket = packet = new SrsFMLEStartPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_PUBLISH) { + srs_info("decode the AMF0/AMF3 command(publish message)."); + *ppacket = packet = new SrsPublishPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) { + srs_info("decode the AMF0/AMF3 command(unpublish message)."); + *ppacket = packet = new SrsFMLEStartPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_DATA_SET_DATAFRAME || command == RTMP_AMF0_DATA_ON_METADATA) { + srs_info("decode the AMF0/AMF3 data(onMetaData message)."); + *ppacket = packet = new SrsOnMetaDataPacket(); + return packet->decode(stream); + } else if(command == SRS_BW_CHECK_FINISHED + || command == SRS_BW_CHECK_PLAYING + || command == SRS_BW_CHECK_PUBLISHING + || command == SRS_BW_CHECK_STARTING_PLAY + || command == SRS_BW_CHECK_STARTING_PUBLISH + || command == SRS_BW_CHECK_START_PLAY + || command == SRS_BW_CHECK_START_PUBLISH + || command == SRS_BW_CHECK_STOPPED_PLAY + || command == SRS_BW_CHECK_STOP_PLAY + || command == SRS_BW_CHECK_STOP_PUBLISH + || command == SRS_BW_CHECK_STOPPED_PUBLISH + || command == SRS_BW_CHECK_FLASH_FINAL) + { + srs_info("decode the AMF0/AMF3 band width check message."); + *ppacket = packet = new SrsBandwidthPacket(); + return packet->decode(stream); + } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) { + srs_info("decode the AMF0/AMF3 closeStream message."); + *ppacket = packet = new SrsCloseStreamPacket(); + return packet->decode(stream); + } + + // default packet to drop message. + srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); + *ppacket = packet = new SrsPacket(); + return ret; + } else if(header.is_user_control_message()) { + srs_verbose("start to decode user control message."); + *ppacket = packet = new SrsUserControlPacket(); + return packet->decode(stream); + } else if(header.is_window_ackledgement_size()) { + srs_verbose("start to decode set ack window size message."); + *ppacket = packet = new SrsSetWindowAckSizePacket(); + return packet->decode(stream); + } else if(header.is_set_chunk_size()) { + srs_verbose("start to decode set chunk size message."); + *ppacket = packet = new SrsSetChunkSizePacket(); + return packet->decode(stream); + } else { + srs_trace("drop unknown message, type=%d", header.message_type); + } + + return ret; +} + +int SrsProtocol::__send_and_free_message(__SrsMessage* msg) +{ + srs_assert(msg); + SrsAutoFree(__SrsMessage, msg, false); + + return __do_send_and_free_message(msg, NULL); +} + +int SrsProtocol::__send_and_free_packet(SrsPacket* packet, int stream_id) +{ + int ret = ERROR_SUCCESS; + + srs_assert(packet); + SrsAutoFree(SrsPacket, packet, false); + + int size = 0; + char* payload = NULL; + if ((ret = packet->encode(size, payload)) != ERROR_SUCCESS) { + srs_error("encode RTMP packet to bytes oriented RTMP message failed. ret=%d", ret); + return ret; + } + + // encode packet to payload and size. + if (size <= 0 || payload == NULL) { + srs_warn("packet is empty, ignore empty message."); + return ret; + } + + // to message + __SrsMessage* msg = new __SrsMessage(); + + msg->payload = (int8_t*)payload; + msg->size = (int32_t)size; + + msg->header.payload_length = size; + msg->header.message_type = packet->get_message_type(); + msg->header.stream_id = stream_id; + msg->header.perfer_cid = packet->get_perfer_cid(); + + if ((ret = __do_send_and_free_message(msg, packet)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsProtocol::__recv_interlaced_message(__SrsMessage** pmsg) +{ + int ret = ERROR_SUCCESS; + + // chunk stream basic header. + char fmt = 0; + int cid = 0; + int bh_size = 0; + if ((ret = __read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read basic header failed. ret=%d", ret); + } + return ret; + } + srs_verbose("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); + + // once we got the chunk message header, + // that is there is a real message in cache, + // increase the timeout to got it. + // For example, in the play loop, we set timeout to 100ms, + // when we got a chunk header, we should increase the timeout, + // or we maybe timeout and disconnect the client. + int64_t timeout_us = skt->get_recv_timeout(); + if (!skt->is_never_timeout(timeout_us)) { + int64_t pkt_timeout_us = srs_max(timeout_us, SRS_MIN_RECV_TIMEOUT_US); + skt->set_recv_timeout(pkt_timeout_us); + srs_verbose("change recv timeout_us " + "from %"PRId64" to %"PRId64"", timeout_us, pkt_timeout_us); + } + + // get the cached chunk stream. + __SrsChunkStream* chunk = NULL; + + if (__chunk_streams.find(cid) == __chunk_streams.end()) { + chunk = __chunk_streams[cid] = new __SrsChunkStream(cid); + srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); + } else { + chunk = __chunk_streams[cid]; + srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", + chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + } + + // chunk stream message header + int mh_size = 0; + if ((ret = __read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read message header failed. ret=%d", ret); + } + return ret; + } + srs_verbose("read message header success. " + "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", + fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, + chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); + + // read msg payload from chunk stream. + __SrsMessage* msg = NULL; + int payload_size = 0; + if ((ret = __read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read message payload failed. ret=%d", ret); + } + return ret; + } + + // reset the recv timeout + if (!skt->is_never_timeout(timeout_us)) { + skt->set_recv_timeout(timeout_us); + srs_verbose("reset recv timeout_us to %"PRId64"", timeout_us); + } + + // not got an entire RTMP message, try next chunk. + if (!msg) { + srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", + payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + return ret; + } + + *pmsg = msg; + srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", + payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + + return ret; +} + +int SrsProtocol::__read_basic_header(char& fmt, int& cid, int& bh_size) +{ + int ret = ERROR_SUCCESS; + + int required_size = 1; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + } + return ret; + } + + char* p = buffer->bytes(); + + fmt = (*p >> 6) & 0x03; + cid = *p & 0x3f; + bh_size = 1; + + if (cid > 1) { + srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); + return ret; + } + + if (cid == 0) { + required_size = 2; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + } + return ret; + } + + cid = 64; + cid += *(++p); + bh_size = 2; + srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); + } else if (cid == 1) { + required_size = 3; + if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + } + return ret; + } + + cid = 64; + cid += *(++p); + cid += *(++p) * 256; + bh_size = 3; + srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); + } else { + srs_error("invalid path, impossible basic header."); + srs_assert(false); + } + + return ret; +} + +int SrsProtocol::__read_message_header(__SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size) +{ + int ret = ERROR_SUCCESS; + + /** + * we should not assert anything about fmt, for the first packet. + * (when first packet, the chunk->msg is NULL). + * the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet. + * the previous packet is: + * 04 // fmt=0, cid=4 + * 00 00 1a // timestamp=26 + * 00 00 9d // payload_length=157 + * 08 // message_type=8(audio) + * 01 00 00 00 // stream_id=1 + * the current packet maybe: + * c4 // fmt=3, cid=4 + * it's ok, for the packet is audio, and timestamp delta is 26. + * the current packet must be parsed as: + * fmt=0, cid=4 + * timestamp=26+26=52 + * payload_length=157 + * message_type=8(audio) + * stream_id=1 + * so we must update the timestamp even fmt=3 for first packet. + */ + // fresh packet used to update the timestamp even fmt=3 for first packet. + bool is_fresh_packet = !chunk->msg; + + // but, we can ensure that when a chunk stream is fresh, + // the fmt must be 0, a new stream. + if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) { + ret = ERROR_RTMP_CHUNK_START; + srs_error("chunk stream is fresh, " + "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); + return ret; + } + + // when exists cache msg, means got an partial message, + // the fmt must not be type0 which means new message. + if (chunk->msg && fmt == RTMP_FMT_TYPE0) { + ret = ERROR_RTMP_CHUNK_START; + srs_error("chunk stream exists, " + "fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); + return ret; + } + + // create msg when new chunk stream start + if (!chunk->msg) { + chunk->msg = new __SrsMessage(); + chunk->msg->header.perfer_cid = chunk->cid; + srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid); + } + + // read message header from socket to buffer. + static char mh_sizes[] = {11, 7, 3, 0}; + mh_size = mh_sizes[(int)fmt]; + srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); + + int required_size = bh_size + mh_size; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + } + return ret; + } + char* p = buffer->bytes() + bh_size; + + // parse the message header. + // see also: ngx_rtmp_recv + if (fmt <= RTMP_FMT_TYPE2) { + char* pp = (char*)&chunk->header.timestamp_delta; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + pp[3] = 0; + + // fmt: 0 + // timestamp: 3 bytes + // If the timestamp is greater than or equal to 16777215 + // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the + // ‘extended timestamp header’ MUST be present. Otherwise, this value + // SHOULD be the entire timestamp. + // + // fmt: 1 or 2 + // timestamp delta: 3 bytes + // If the delta is greater than or equal to 16777215 (hexadecimal + // 0x00ffffff), this value MUST be 16777215, and the ‘extended + // timestamp header’ MUST be present. Otherwise, this value SHOULD be + // the entire delta. + chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); + if (chunk->extended_timestamp) { + // Extended timestamp: 0 or 4 bytes + // This field MUST be sent when the normal timsestamp is set to + // 0xffffff, it MUST NOT be sent if the normal timestamp is set to + // anything else. So for values less than 0xffffff the normal + // timestamp field SHOULD be used in which case the extended timestamp + // MUST NOT be present. For values greater than or equal to 0xffffff + // the normal timestamp field MUST NOT be used and MUST be set to + // 0xffffff and the extended timestamp MUST be sent. + // + // if extended timestamp, the timestamp must >= RTMP_EXTENDED_TIMESTAMP + // we set the timestamp to RTMP_EXTENDED_TIMESTAMP to identify we + // got an extended timestamp. + chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP; + } else { + if (fmt == RTMP_FMT_TYPE0) { + // 6.1.2.1. Type 0 + // For a type-0 chunk, the absolute timestamp of the message is sent + // here. + chunk->header.timestamp = chunk->header.timestamp_delta; + } else { + // 6.1.2.2. Type 1 + // 6.1.2.3. Type 2 + // For a type-1 or type-2 chunk, the difference between the previous + // chunk's timestamp and the current chunk's timestamp is sent here. + chunk->header.timestamp += chunk->header.timestamp_delta; + } + } + + if (fmt <= RTMP_FMT_TYPE1) { + pp = (char*)&chunk->header.payload_length; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + pp[3] = 0; + + // if msg exists in cache, the size must not changed. + if (chunk->msg->size > 0 && chunk->msg->size != chunk->header.payload_length) { + ret = ERROR_RTMP_PACKET_SIZE; + srs_error("msg exists in chunk cache, " + "size=%d cannot change to %d, ret=%d", + chunk->msg->size, chunk->header.payload_length, ret); + return ret; + } + + chunk->header.message_type = *p++; + + if (fmt == RTMP_FMT_TYPE0) { + pp = (char*)&chunk->header.stream_id; + pp[0] = *p++; + pp[1] = *p++; + pp[2] = *p++; + pp[3] = *p++; + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d, sid=%d", + fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length, + chunk->header.message_type, chunk->header.stream_id); + } else { + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d", + fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length, + chunk->header.message_type); + } + } else { + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64"", + fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp); + } + } else { + // update the timestamp even fmt=3 for first stream + if (is_fresh_packet && !chunk->extended_timestamp) { + chunk->header.timestamp += chunk->header.timestamp_delta; + } + srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d", + fmt, mh_size, chunk->extended_timestamp); + } + + if (chunk->extended_timestamp) { + mh_size += 4; + required_size = bh_size + mh_size; + srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + } + return ret; + } + + u_int32_t timestamp = 0x00; + char* pp = (char*)×tamp; + pp[3] = *p++; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + + // ffmpeg/librtmp may donot send this filed, need to detect the value. + // @see also: http://blog.csdn.net/win_lin/article/details/13363699 + // compare to the chunk timestamp, which is set by chunk message header + // type 0,1 or 2. + u_int32_t chunk_timestamp = chunk->header.timestamp; + if (chunk_timestamp > RTMP_EXTENDED_TIMESTAMP && chunk_timestamp != timestamp) { + mh_size -= 4; + srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size); + } else { + chunk->header.timestamp = timestamp; + } + srs_verbose("header read ext_time completed. time=%"PRId64"", chunk->header.timestamp); + } + + // the extended-timestamp must be unsigned-int, + // 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h + // 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d + // because the rtmp protocol says the 32bits timestamp is about "50 days": + // 3. Byte Order, Alignment, and Time Format + // Because timestamps are generally only 32 bits long, they will roll + // over after fewer than 50 days. + // + // but, its sample says the timestamp is 31bits: + // An application could assume, for example, that all + // adjacent timestamps are within 2^31 milliseconds of each other, so + // 10000 comes after 4000000000, while 3000000000 comes before + // 4000000000. + // and flv specification says timestamp is 31bits: + // Extension of the Timestamp field to form a SI32 value. This + // field represents the upper 8 bits, while the previous + // Timestamp field represents the lower 24 bits of the time in + // milliseconds. + // in a word, 31bits timestamp is ok. + // convert extended timestamp to 31bits. + chunk->header.timestamp &= 0x7fffffff; + + // valid message + if (chunk->header.payload_length < 0) { + ret = ERROR_RTMP_MSG_INVLIAD_SIZE; + srs_error("RTMP message size must not be negative. size=%d, ret=%d", + chunk->header.payload_length, ret); + return ret; + } + + // copy header to msg + chunk->msg->header = chunk->header; + + // increase the msg count, the chunk stream can accept fmt=1/2/3 message now. + chunk->msg_count++; + + return ret; +} + +int SrsProtocol::__read_message_payload(__SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, __SrsMessage** pmsg) +{ + int ret = ERROR_SUCCESS; + + // empty message + if (chunk->header.payload_length <= 0) { + // need erase the header in buffer. + buffer->erase(bh_size + mh_size); + + srs_trace("get an empty RTMP " + "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, + chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); + + *pmsg = chunk->msg; + chunk->msg = NULL; + + return ret; + } + srs_assert(chunk->header.payload_length > 0); + + // the chunk payload size. + payload_size = chunk->header.payload_length - chunk->msg->size; + payload_size = srs_min(payload_size, in_chunk_size); + srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", + payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); + + // create msg payload if not initialized + if (!chunk->msg->payload) { + chunk->msg->payload = new int8_t[chunk->header.payload_length]; + memset(chunk->msg->payload, 0, chunk->header.payload_length); + srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); + } + + // read payload to buffer + int required_size = bh_size + mh_size + payload_size; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); + } + return ret; + } + memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size); + buffer->erase(bh_size + mh_size + payload_size); + chunk->msg->size += payload_size; + + srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); + + // got entire RTMP message? + if (chunk->header.payload_length == chunk->msg->size) { + *pmsg = chunk->msg; + chunk->msg = NULL; + srs_verbose("get entire RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d)", + chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + return ret; + } + + srs_verbose("get partial RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d), partial size=%d", + chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id, + chunk->msg->size); + + return ret; +} + +int SrsProtocol::__on_recv_message(__SrsMessage* msg) +{ + int ret = ERROR_SUCCESS; + + srs_assert(msg != NULL); + + // acknowledgement + if (in_ack_size.ack_window_size > 0 + && skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size + ) { + if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) { + return ret; + } + } + + SrsPacket* packet = NULL; + switch (msg->header.message_type) { + case RTMP_MSG_SetChunkSize: + case RTMP_MSG_UserControlMessage: + case RTMP_MSG_WindowAcknowledgementSize: + if ((ret = __decode_message(msg, &packet)) != ERROR_SUCCESS) { + srs_error("decode packet from message payload failed. ret=%d", ret); + return ret; + } + srs_verbose("decode packet from message payload success."); + break; + default: + return ret; + } + + srs_assert(packet); + + // always free the packet. + SrsAutoFree(SrsPacket, packet, false); + + switch (msg->header.message_type) { + case RTMP_MSG_WindowAcknowledgementSize: { + SrsSetWindowAckSizePacket* pkt = dynamic_cast(packet); + srs_assert(pkt != NULL); + + if (pkt->ackowledgement_window_size > 0) { + in_ack_size.ack_window_size = pkt->ackowledgement_window_size; + srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); + } else { + srs_warn("ignored. set ack window size is %d", pkt->ackowledgement_window_size); + } + break; + } + case RTMP_MSG_SetChunkSize: { + SrsSetChunkSizePacket* pkt = dynamic_cast(packet); + srs_assert(pkt != NULL); + + in_chunk_size = pkt->chunk_size; + + srs_trace("set input chunk size to %d", pkt->chunk_size); + break; + } + case RTMP_MSG_UserControlMessage: { + SrsUserControlPacket* pkt = dynamic_cast(packet); + srs_assert(pkt != NULL); + + if (pkt->event_type == SrcPCUCSetBufferLength) { + srs_trace("ignored. set buffer length to %d", pkt->extra_data); + } + if (pkt->event_type == SrcPCUCPingRequest) { + if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) { + return ret; + } + } + break; + } + } + + return ret; +} + +int SrsProtocol::__on_send_message(__SrsMessage* msg, SrsPacket* packet) +{ + int ret = ERROR_SUCCESS; + + // ignore raw bytes oriented RTMP message. + if (!packet) { + return ret; + } + + switch (msg->header.message_type) { + case RTMP_MSG_SetChunkSize: { + SrsSetChunkSizePacket* pkt = dynamic_cast(packet); + srs_assert(pkt != NULL); + + out_chunk_size = pkt->chunk_size; + + srs_trace("set output chunk size to %d", pkt->chunk_size); + break; + } + case RTMP_MSG_AMF0CommandMessage: + case RTMP_MSG_AMF3CommandMessage: { + if (true) { + SrsConnectAppPacket* pkt = dynamic_cast(packet); + if (pkt) { + requests[pkt->transaction_id] = pkt->command_name; + break; + } + } + if (true) { + SrsCreateStreamPacket* pkt = dynamic_cast(packet); + if (pkt) { + requests[pkt->transaction_id] = pkt->command_name; + break; + } + } + if (true) { + SrsFMLEStartPacket* pkt = dynamic_cast(packet); + if (pkt) { + requests[pkt->transaction_id] = pkt->command_name; + break; + } + } + break; + } + } + + return ret; +} + int SrsProtocol::recv_message(SrsCommonMessage** pmsg) { *pmsg = NULL; @@ -1161,6 +2151,7 @@ SrsMessageHeader::SrsMessageHeader() stream_id = 0; timestamp = 0; + perfer_cid = RTMP_CID_ProtocolControl; } SrsMessageHeader::~SrsMessageHeader() @@ -1258,6 +2249,122 @@ SrsChunkStream::~SrsChunkStream() srs_freep(msg); } +__SrsChunkStream::__SrsChunkStream(int _cid) +{ + fmt = 0; + cid = _cid; + extended_timestamp = false; + msg = NULL; + msg_count = 0; +} + +__SrsChunkStream::~__SrsChunkStream() +{ + srs_freep(msg); +} + +__SrsMessage::__SrsMessage() +{ + payload = NULL; + size = 0; +} + +__SrsMessage::~__SrsMessage() +{ +} + +__SrsSharedPtrMessage::__SrsSharedPtr::__SrsSharedPtr() +{ + payload = NULL; + size = 0; + shared_count = 0; +} + +__SrsSharedPtrMessage::__SrsSharedPtr::~__SrsSharedPtr() +{ + srs_freepa(payload); +} + +__SrsSharedPtrMessage::__SrsSharedPtrMessage() +{ + ptr = NULL; +} + +__SrsSharedPtrMessage::~__SrsSharedPtrMessage() +{ + if (ptr) { + if (ptr->shared_count == 0) { + srs_freep(ptr); + } else { + ptr->shared_count--; + } + } +} + +int __SrsSharedPtrMessage::initialize(__SrsMessage* source) +{ + int ret = ERROR_SUCCESS; + + if ((ret = initialize(&source->header, (char*)source->payload, source->size)) != ERROR_SUCCESS) { + return ret; + } + + // detach the payload from source + source->payload = NULL; + source->size = 0; + + return ret; +} + +int __SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size) +{ + int ret = ERROR_SUCCESS; + + srs_assert(source != NULL); + if (ptr) { + ret = ERROR_SYSTEM_ASSERT_FAILED; + srs_error("should not set the payload twice. ret=%d", ret); + srs_assert(false); + + return ret; + } + + header = *source; + header.payload_length = size; + + ptr = new __SrsSharedPtr(); + + // direct attach the data of common message. + ptr->payload = payload; + ptr->size = size; + + __SrsMessage::payload = (int8_t*)ptr->payload; + __SrsMessage::size = ptr->size; + + return ret; +} + +__SrsSharedPtrMessage* __SrsSharedPtrMessage::copy() +{ + if (!ptr) { + srs_error("invoke initialize to initialize the ptr."); + srs_assert(false); + return NULL; + } + + __SrsSharedPtrMessage* copy = new __SrsSharedPtrMessage(); + + copy->header = header; + + copy->ptr = ptr; + ptr->shared_count++; + + copy->payload = (int8_t*)ptr->payload; + copy->size = ptr->size; + + return copy; +} + ISrsMessage::ISrsMessage() { payload = NULL; diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index 3ca342f18..bf80348ea 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -45,6 +45,9 @@ class SrsChunkStream; class SrsAmf0Object; class SrsAmf0Any; class ISrsMessage; +class SrsMessageHeader; +class __SrsMessage; +class __SrsChunkStream; // the following is the timeout for rtmp protocol, // to avoid death connection. @@ -110,6 +113,9 @@ private: // peer in private: std::map chunk_streams; + // TODO: FIXME: rename to chunk_streams + std::map __chunk_streams; + SrsStream* decode_stream; SrsBuffer* buffer; int32_t in_chunk_size; AckWindowSize in_ack_size; @@ -125,6 +131,7 @@ public: SrsProtocol(ISrsProtocolReaderWriter* io); virtual ~SrsProtocol(); public: + // TODO: FIXME: to private. std::string get_request_name(double transcationId); /** * set the timeout in us. @@ -138,6 +145,82 @@ public: virtual int64_t get_send_bytes(); virtual int get_recv_kbps(); virtual int get_send_kbps(); +public: + /** + * recv a RTMP message, which is bytes oriented. + * user can use decode_message to get the decoded RTMP packet. + * @param pmsg, set the received message, + * always NULL if error, + * NULL for unknown packet but return success. + * never NULL if decode success. + */ + virtual int __recv_message(__SrsMessage** pmsg); + /** + * decode bytes oriented RTMP message to RTMP packet, + * @param ppacket, output decoded packet, + * always NULL if error, never NULL if success. + * @return error when unknown packet, error when decode failed. + */ + virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket); + /** + * send the RTMP message and always free it. + * user must never free or use the msg after this method, + * for it will always free the msg. + * @param msg, the msg to send out, never be NULL. + */ + virtual int __send_and_free_message(__SrsMessage* msg); + /** + * send the RTMP packet and always free it. + * user must never free or use the packet after this method, + * for it will always free the packet. + * @param packet, the packet to send out, never be NULL. + * @param stream_id, the stream id of packet to send over, 0 for control message. + */ + virtual int __send_and_free_packet(SrsPacket* packet, int stream_id); +private: + /** + * imp for __send_and_free_message + * @param packet the packet of message, NULL for raw message. + */ + virtual int __do_send_and_free_message(__SrsMessage* msg, SrsPacket* packet); + /** + * imp for __decode_message + */ + virtual int __do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket); + /** + * recv bytes oriented RTMP message from protocol stack. + * return error if error occur and nerver set the pmsg, + * return success and pmsg set to NULL if no entire message got, + * return success and pmsg set to entire message if got one. + */ + virtual int __recv_interlaced_message(__SrsMessage** pmsg); + /** + * read the chunk basic header(fmt, cid) from chunk stream. + * user can discovery a SrsChunkStream by cid. + * @bh_size return the chunk basic header size, to remove the used bytes when finished. + */ + virtual int __read_basic_header(char& fmt, int& cid, int& bh_size); + /** + * read the chunk message header(timestamp, payload_length, message_type, stream_id) + * from chunk stream and save to SrsChunkStream. + * @mh_size return the chunk message header size, to remove the used bytes when finished. + */ + virtual int __read_message_header(__SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); + /** + * read the chunk payload, remove the used bytes in buffer, + * if got entire message, set the pmsg. + * @payload_size read size in this roundtrip, generally a chunk size or left message size. + */ + virtual int __read_message_payload(__SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, __SrsMessage** pmsg); + /** + * when recv message, update the context. + */ + virtual int __on_recv_message(__SrsMessage* msg); + /** + * when message sentout, update the context. + */ + virtual int __on_send_message(__SrsMessage* msg, SrsPacket* packet); +public: /** * recv a message with raw/undecoded payload from peer. * the payload is not decoded, use srs_rtmp_expect_message if requires @@ -226,6 +309,16 @@ struct SrsMessageHeader */ int64_t timestamp; +public: + /** + * get the perfered cid(chunk stream id) which sendout over. + * set at decoding, and canbe used for directly send message, + * for example, dispatch to all connections. + * @see: SrsSharedPtrMessage.SrsSharedPtr.perfer_cid + */ + int perfer_cid; + +public: SrsMessageHeader(); virtual ~SrsMessageHeader(); @@ -283,6 +376,109 @@ public: virtual ~SrsChunkStream(); }; +/** +* incoming chunk stream maybe interlaced, +* use the chunk stream to cache the input RTMP chunk streams. +*/ +class __SrsChunkStream +{ +public: + /** + * represents the basic header fmt, + * which used to identify the variant message header type. + */ + char fmt; + /** + * represents the basic header cid, + * which is the chunk stream id. + */ + int cid; + /** + * cached message header + */ + SrsMessageHeader header; + /** + * whether the chunk message header has extended timestamp. + */ + bool extended_timestamp; + /** + * partially read message. + */ + __SrsMessage* msg; + /** + * decoded msg count, to identify whether the chunk stream is fresh. + */ + int64_t msg_count; +public: + __SrsChunkStream(int _cid); + virtual ~__SrsChunkStream(); +}; + +/** +* message is raw data RTMP message, bytes oriented, +* protcol always recv RTMP message, and can send RTMP message or RTMP packet. +* the shared-ptr message is a special RTMP message, use ref-count for performance issue. +*/ +class __SrsMessage +{ +// 4.1. Message Header +public: + SrsMessageHeader header; +// 4.2. Message Payload +public: + /** + * The other part which is the payload is the actual data that is + * contained in the message. For example, it could be some audio samples + * or compressed video data. The payload format and interpretation are + * beyond the scope of this document. + */ + int32_t size; + int8_t* payload; +public: + __SrsMessage(); + virtual ~__SrsMessage(); +}; + +/** +* shared ptr message. +* for audio/video/data message that need less memory copy. +* and only for output. +*/ +class __SrsSharedPtrMessage : public __SrsMessage +{ +private: + struct __SrsSharedPtr + { + char* payload; + int size; + int shared_count; + + __SrsSharedPtr(); + virtual ~__SrsSharedPtr(); + }; + __SrsSharedPtr* ptr; +public: + __SrsSharedPtrMessage(); + virtual ~__SrsSharedPtrMessage(); +public: + /** + * set the shared payload. + * we will detach the payload of source, + * so ensure donot use it before. + */ + virtual int initialize(__SrsMessage* source); + /** + * set the shared payload. + * use source header, and specified param payload. + */ + virtual int initialize(SrsMessageHeader* source, char* payload, int size); +public: + /** + * copy current shared ptr message, use ref-count. + */ + virtual __SrsSharedPtrMessage* copy(); +}; + /** * message to output. */ @@ -1215,5 +1411,44 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** return ret; } +template +int __srs_rtmp_expect_message(SrsProtocol* protocol, __SrsMessage** pmsg, T** ppacket) +{ + *pmsg = NULL; + *ppacket = NULL; + + int ret = ERROR_SUCCESS; + + while (true) { + __SrsMessage* msg = NULL; + if ((ret = protocol->__recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv message failed. ret=%d", ret); + return ret; + } + srs_verbose("recv message success."); + + SrsPacket* packet = NULL; + if ((ret = protocol->__decode_message(msg, &packet)) != ERROR_SUCCESS) { + srs_error("decode message failed. ret=%d", ret); + srs_freep(msg); + return ret; + } + + T* pkt = dynamic_cast(packet); + if (!pkt) { + srs_trace("drop message(type=%d, size=%d, time=%"PRId64", sid=%d).", + msg->header.message_type, msg->header.payload_length, + msg->header.timestamp, msg->header.stream_id); + srs_freep(msg); + continue; + } + + *pmsg = msg; + *ppacket = pkt; + break; + } + + return ret; +} #endif