diff --git a/trunk/src/app/srs_app_bandwidth.cpp b/trunk/src/app/srs_app_bandwidth.cpp index 439d42803..80af21a66 100644 --- a/trunk/src/app/srs_app_bandwidth.cpp +++ b/trunk/src/app/srs_app_bandwidth.cpp @@ -189,22 +189,22 @@ int SrsBandwidth::do_bandwidth_check() pkt->data->set("publish_bytes", SrsAmf0Any::number(publish_bytes)); pkt->data->set("publish_time", SrsAmf0Any::number(publish_actual_duration_ms)); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send bandwidth check finish message failed. ret=%d", ret); return ret; } // if flash, we notice the result, and expect a final packet. while (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { // info level to ignore and return success. srs_info("expect final message failed. ret=%d", ret); return ERROR_SUCCESS; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get final message success."); if (pkt->is_flash_final()) { @@ -233,8 +233,7 @@ int SrsBandwidth::check_play( pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms)); pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms)); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send bandwidth check start play message failed. ret=%d", ret); return ret; } @@ -243,13 +242,14 @@ int SrsBandwidth::check_play( while (true) { // recv client's starting play response - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect bandwidth message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandwidth message succes."); if (pkt->is_starting_play()) { @@ -284,8 +284,7 @@ int SrsBandwidth::check_play( // TODO: FIXME: get length from the rtmp protocol stack. play_bytes += pkt->get_payload_length(); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send bandwidth check play messages failed. ret=%d", ret); return ret; } @@ -309,13 +308,13 @@ int SrsBandwidth::check_play( if (true) { // notify client to stop play SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_stop_play(); + pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms)); pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms)); pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms)); pkt->data->set("bytes_delta", SrsAmf0Any::number(play_bytes)); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send bandwidth check stop play message failed. ret=%d", ret); return ret; } @@ -324,13 +323,14 @@ int SrsBandwidth::check_play( while (true) { // recv client's stop play response. - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect bandwidth message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandwidth message succes."); if (pkt->is_stopped_play()) { @@ -357,8 +357,7 @@ int SrsBandwidth::check_publish( pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms)); pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms)); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send bandwidth check start publish message failed. ret=%d", ret); return ret; } @@ -367,13 +366,14 @@ int SrsBandwidth::check_publish( while (true) { // read client's notification of starting publish - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect bandwidth message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandwidth message succes."); if (pkt->is_starting_publish()) { @@ -387,12 +387,12 @@ int SrsBandwidth::check_publish( while ( (srs_get_system_time_ms() - current_time) < duration_ms ) { st_usleep(0); - SrsCommonMessage* msg = NULL; - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) { srs_error("recv message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); // TODO: FIXME. publish_bytes += msg->header.payload_length; @@ -420,8 +420,7 @@ int SrsBandwidth::check_publish( pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms)); pkt->data->set("bytes_delta", SrsAmf0Any::number(publish_bytes)); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send bandwidth check stop publish message failed. ret=%d", ret); return ret; } @@ -436,13 +435,14 @@ int SrsBandwidth::check_publish( // TODO: FIXME: check whether flash client. while (false) { // recv client's stop publish response. - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect bandwidth message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandwidth message succes."); if (pkt->is_stopped_publish()) { diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 56cefb9aa..337fa6cd6 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -315,8 +315,8 @@ int SrsForwarder::forward() // read from client. if (true) { - SrsCommonMessage* msg = NULL; - ret = client->recv_message(&msg); + __SrsMessage* msg = NULL; + ret = client->__recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 91612304e..a69eb9b86 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -713,6 +713,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, b srs_error("source process audio message failed. ret=%d", ret); return ret; } + return ret; } // process video packet if (msg->header.is_video()) { @@ -720,6 +721,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, b srs_error("source process video message failed. ret=%d", ret); return ret; } + return ret; } // process onMetaData diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 0b1122f8b..38e498dfe 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -283,15 +283,15 @@ int srs_read_packet(srs_rtmp_t rtmp, int* type, u_int32_t* timestamp, char** dat Context* context = (Context*)rtmp; for (;;) { - SrsCommonMessage* msg = NULL; - if ((ret = context->rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + __SrsMessage* msg = NULL; + if ((ret = context->rtmp->__recv_message(&msg)) != ERROR_SUCCESS) { return ret; } if (!msg) { continue; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); if (msg->header.is_audio()) { *type = SRS_RTMP_TYPE_AUDIO; @@ -332,13 +332,13 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, srs_assert(rtmp != NULL); Context* context = (Context*)rtmp; - SrsSharedPtrMessage* msg = NULL; + __SrsSharedPtrMessage* msg = NULL; if (type == SRS_RTMP_TYPE_AUDIO) { SrsMessageHeader header; header.initialize_audio(size, timestamp, context->stream_id); - msg = new SrsSharedPtrMessage(); + msg = new __SrsSharedPtrMessage(); if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { srs_freepa(data); return ret; @@ -347,7 +347,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, SrsMessageHeader header; header.initialize_video(size, timestamp, context->stream_id); - msg = new SrsSharedPtrMessage(); + msg = new __SrsSharedPtrMessage(); if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { srs_freepa(data); return ret; @@ -356,7 +356,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, SrsMessageHeader header; header.initialize_amf0_script(size, context->stream_id); - msg = new SrsSharedPtrMessage(); + msg = new __SrsSharedPtrMessage(); if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { srs_freepa(data); return ret; @@ -365,7 +365,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, if (msg) { // send out encoded msg. - if ((ret = context->rtmp->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = context->rtmp->__send_and_free_message(msg)) != ERROR_SUCCESS) { return ret; } } else { diff --git a/trunk/src/main/srs_main_bandcheck.cpp b/trunk/src/main/srs_main_bandcheck.cpp index 7450fe640..2574a9a30 100644 --- a/trunk/src/main/srs_main_bandcheck.cpp +++ b/trunk/src/main/srs_main_bandcheck.cpp @@ -341,13 +341,14 @@ int SrsBandCheckClient::expect_start_play() int ret = ERROR_SUCCESS; // expect connect _result - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect bandcheck start play message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandcheck start play message"); if (pkt->command_name != SRS_BW_CHECK_START_PLAY) { @@ -362,12 +363,9 @@ int SrsBandCheckClient::send_starting_play() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage; SrsBandwidthPacket* pkt = new SrsBandwidthPacket; pkt->command_name = SRS_BW_CHECK_STARTING_PLAY; - msg->set_packet(pkt, 0); - - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send starting play msg failed. ret=%d", ret); return ret; } @@ -380,13 +378,14 @@ int SrsBandCheckClient::expect_stop_play() int ret = ERROR_SUCCESS; while (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect stop play message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandcheck stop play message"); if (pkt->command_name == SRS_BW_CHECK_STOP_PLAY) { @@ -401,12 +400,9 @@ int SrsBandCheckClient::send_stopped_play() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage; SrsBandwidthPacket* pkt = new SrsBandwidthPacket; pkt->command_name = SRS_BW_CHECK_STOPPED_PLAY; - msg->set_packet(pkt, 0); - - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send stopped play msg failed. ret=%d", ret); return ret; } @@ -419,13 +415,14 @@ int SrsBandCheckClient::expect_start_pub() int ret = ERROR_SUCCESS; while (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect start pub message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandcheck start pub message"); if (pkt->command_name == SRS_BW_CHECK_START_PUBLISH) { @@ -440,12 +437,9 @@ int SrsBandCheckClient::send_starting_pub() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage; SrsBandwidthPacket* pkt = new SrsBandwidthPacket; pkt->command_name = SRS_BW_CHECK_STARTING_PUBLISH; - msg->set_packet(pkt, 0); - - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send starting play msg failed. ret=%d", ret); return ret; } @@ -460,10 +454,8 @@ int SrsBandCheckClient::send_pub_data() int data_count = 100; while (true) { - SrsCommonMessage* msg = new SrsCommonMessage; SrsBandwidthPacket* pkt = new SrsBandwidthPacket; pkt->command_name = SRS_BW_CHECK_PUBLISHING; - msg->set_packet(pkt, 0); for (int i = 0; i < data_count; ++i) { std::stringstream seq; @@ -473,7 +465,7 @@ int SrsBandCheckClient::send_pub_data() } data_count += 100; - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send publish message failed.ret=%d", ret); return ret; } @@ -493,11 +485,13 @@ int SrsBandCheckClient::expect_stop_pub() this->set_recv_timeout(1000 * 1000); this->set_send_timeout(1000 * 1000); - SrsCommonMessage* msg; + __SrsMessage* msg; SrsBandwidthPacket* pkt; - if ((ret = srs_rtmp_expect_message(this->protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(this->protocol, &msg, &pkt)) != ERROR_SUCCESS) { return ret; } + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); if (pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) { return ret; } @@ -510,13 +504,14 @@ int SrsBandCheckClient::expect_finished() int ret = ERROR_SUCCESS; while (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsBandwidthPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect finished message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsBandwidthPacket, pkt, false); srs_info("get bandcheck finished message"); if (pkt->command_name == SRS_BW_CHECK_FINISHED) { @@ -596,12 +591,9 @@ int SrsBandCheckClient::send_stopped_pub() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage; SrsBandwidthPacket* pkt = new SrsBandwidthPacket; pkt->command_name = SRS_BW_CHECK_STOPPED_PUBLISH; - msg->set_packet(pkt, 0); - - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send stopped pub msg failed. ret=%d", ret); return ret; } @@ -614,12 +606,9 @@ int SrsBandCheckClient::send_final() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage; SrsBandwidthPacket* pkt = new SrsBandwidthPacket; pkt->command_name = SRS_BW_CHECK_FLASH_FINAL; - msg->set_packet(pkt, 0); - - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send final msg failed. ret=%d", ret); return ret; } diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 4ebddaa0c..ed28a8cee 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -372,16 +372,6 @@ int SrsRtmpClient::get_send_kbps() return protocol->get_send_kbps(); } -int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg) -{ - return protocol->recv_message(pmsg); -} - -int SrsRtmpClient::send_message(ISrsMessage* msg) -{ - return protocol->send_message(msg); -} - int SrsRtmpClient::__recv_message(__SrsMessage** pmsg) { return protocol->__recv_message(pmsg); @@ -462,9 +452,7 @@ int SrsRtmpClient::connect_app(string app, string tc_url) // Connect(vhost, app) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsConnectAppPacket* pkt = new SrsConnectAppPacket(); - msg->set_packet(pkt, 0); pkt->command_object = SrsAmf0Any::object(); pkt->command_object->set("app", SrsAmf0Any::str(app.c_str())); @@ -478,32 +466,29 @@ int SrsRtmpClient::connect_app(string app, string tc_url) pkt->command_object->set("pageUrl", SrsAmf0Any::str()); pkt->command_object->set("objectEncoding", SrsAmf0Any::number(0)); - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { return ret; } } // Set Window Acknowledgement size(2500000) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket(); - pkt->ackowledgement_window_size = 2500000; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { return ret; } } // expect connect _result - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsConnectAppResPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect connect app response message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsConnectAppResPacket, pkt, false); srs_info("get connect app response message"); return ret; @@ -515,25 +500,22 @@ int SrsRtmpClient::create_stream(int& stream_id) // CreateStream if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket(); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { return ret; } } // CreateStream _result. if (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsCreateStreamResPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect create stream response message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsCreateStreamResPacket, pkt, false); srs_info("get create stream response message"); stream_id = (int)pkt->stream_id; @@ -548,13 +530,9 @@ int SrsRtmpClient::play(string stream, int stream_id) // Play(stream) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsPlayPacket* pkt = new SrsPlayPacket(); - pkt->stream_name = stream; - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send play stream failed. " "stream=%s, stream_id=%d, ret=%d", stream.c_str(), stream_id, ret); @@ -565,15 +543,13 @@ int SrsRtmpClient::play(string stream, int stream_id) // SetBufferLength(1000ms) int buffer_length_ms = 1000; if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsUserControlPacket* pkt = new SrsUserControlPacket(); pkt->event_type = SrcPCUCSetBufferLength; pkt->event_data = stream_id; pkt->extra_data = buffer_length_ms; - msg->set_packet(pkt, 0); - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send set buffer length failed. " "stream=%s, stream_id=%d, bufferLength=%d, ret=%d", stream.c_str(), stream_id, buffer_length_ms, ret); @@ -583,13 +559,9 @@ int SrsRtmpClient::play(string stream, int stream_id) // SetChunkSize if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); - pkt->chunk_size = SRS_CONF_DEFAULT_CHUNK_SIZE; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send set chunk size failed. " "stream=%s, chunk_size=%d, ret=%d", stream.c_str(), SRS_CONF_DEFAULT_CHUNK_SIZE, ret); @@ -606,13 +578,9 @@ int SrsRtmpClient::publish(string stream, int stream_id) // SetChunkSize if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); - pkt->chunk_size = SRS_CONF_DEFAULT_CHUNK_SIZE; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send set chunk size failed. " "stream=%s, chunk_size=%d, ret=%d", stream.c_str(), SRS_CONF_DEFAULT_CHUNK_SIZE, ret); @@ -622,13 +590,9 @@ int SrsRtmpClient::publish(string stream, int stream_id) // publish(stream) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsPublishPacket* pkt = new SrsPublishPacket(); - pkt->stream_name = stream; - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send publish message failed. " "stream=%s, stream_id=%d, ret=%d", stream.c_str(), stream_id, ret); @@ -647,12 +611,8 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id) // SrsFMLEStartPacket if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_release_stream(stream); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send FMLE publish " "release stream failed. stream=%s, ret=%d", stream.c_str(), ret); return ret; @@ -661,12 +621,8 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id) // FCPublish if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_FC_publish(stream); - - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send FMLE publish " "FCPublish failed. stream=%s, ret=%d", stream.c_str(), ret); return ret; @@ -675,13 +631,9 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id) // CreateStream if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket(); - pkt->transaction_id = 4; - msg->set_packet(pkt, 0); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send FMLE publish " "createStream failed. stream=%s, ret=%d", stream.c_str(), ret); return ret; @@ -690,13 +642,14 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id) // expect result of CreateStream if (true) { - SrsCommonMessage* msg = NULL; + __SrsMessage* msg = NULL; SrsCreateStreamResPacket* pkt = NULL; - if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = __srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect create stream response message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsCreateStreamResPacket, pkt, false); srs_info("get create stream response message"); stream_id = (int)pkt->stream_id; @@ -704,13 +657,9 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id) // publish(stream) if (true) { - SrsCommonMessage* msg = new SrsCommonMessage(); SrsPublishPacket* pkt = new SrsPublishPacket(); - pkt->stream_name = stream; - msg->set_packet(pkt, stream_id); - - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) { srs_error("send FMLE publish publish failed. " "stream=%s, stream_id=%d, ret=%d", stream.c_str(), stream_id, ret); return ret; @@ -778,16 +727,6 @@ int SrsRtmpServer::get_send_kbps() return protocol->get_send_kbps(); } -int SrsRtmpServer::recv_message(SrsCommonMessage** pmsg) -{ - return protocol->recv_message(pmsg); -} - -int SrsRtmpServer::send_message(ISrsMessage* msg) -{ - return protocol->send_message(msg); -} - int SrsRtmpServer::__recv_message(__SrsMessage** pmsg) { return protocol->__recv_message(pmsg); @@ -841,6 +780,7 @@ int SrsRtmpServer::connect_app(SrsRequest* req) return ret; } SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsConnectAppPacket, pkt, false); srs_info("get connect app message"); SrsAmf0Any* prop = NULL; @@ -954,8 +894,7 @@ void SrsRtmpServer::response_connect_reject(SrsRequest *req, const char* desc) pkt->props->set(StatusDescription, SrsAmf0Any::str(desc)); //pkt->props->set("objectEncoding", SrsAmf0Any::number(req->objectEncoding)); - SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send connect app response rejected message failed. ret=%d", ret); return; } @@ -1197,6 +1136,8 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) srs_info("recv FCPublish request message success."); SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsFMLEStartPacket, pkt, false); + fc_publish_tid = pkt->transaction_id; } // FCPublish response @@ -1221,6 +1162,8 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) srs_info("recv createStream request message success."); SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsCreateStreamPacket, pkt, false); + create_stream_tid = pkt->transaction_id; } // createStream response @@ -1244,6 +1187,7 @@ int SrsRtmpServer::start_fmle_publish(int stream_id) srs_info("recv publish request message success."); SrsAutoFree(__SrsMessage, msg, false); + SrsAutoFree(SrsPublishPacket, pkt, false); } // publish response onFCPublish(NetStream.Publish.Start) if (true) { diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 3a1d9b558..2c2381b5b 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -164,8 +164,6 @@ public: virtual int64_t get_send_bytes(); virtual int get_recv_kbps(); virtual int get_send_kbps(); - virtual int recv_message(SrsCommonMessage** pmsg); - virtual int send_message(ISrsMessage* msg); virtual int __recv_message(__SrsMessage** pmsg); virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket); virtual int __send_and_free_message(__SrsMessage* msg); @@ -213,8 +211,6 @@ public: virtual int64_t get_send_bytes(); virtual int get_recv_kbps(); virtual int get_send_kbps(); - virtual int recv_message(SrsCommonMessage** pmsg); - virtual int send_message(ISrsMessage* msg); virtual int __recv_message(__SrsMessage** pmsg); virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket); virtual int __send_and_free_message(__SrsMessage* msg); diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index 1e7859865..48b9c7f7c 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -305,16 +305,6 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) SrsProtocol::~SrsProtocol() { - if (true) { - std::map::iterator it; - - for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) { - SrsChunkStream* stream = it->second; - srs_freep(stream); - } - - chunk_streams.clear(); - } if (true) { std::map::iterator it; @@ -819,6 +809,9 @@ int SrsProtocol::__recv_interlaced_message(__SrsMessage** pmsg) if (__chunk_streams.find(cid) == __chunk_streams.end()) { chunk = __chunk_streams[cid] = new __SrsChunkStream(cid); + // set the perfer cid of chunk, + // which will copy to the message received. + chunk->header.perfer_cid = cid; srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); } else { chunk = __chunk_streams[cid]; @@ -979,7 +972,6 @@ int SrsProtocol::__read_message_header(__SrsChunkStream* chunk, char fmt, int bh // create msg when new chunk stream start if (!chunk->msg) { chunk->msg = new __SrsMessage(); - chunk->msg->header.perfer_cid = chunk->cid; srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid); } @@ -1355,187 +1347,13 @@ int SrsProtocol::__on_send_message(__SrsMessage* msg, SrsPacket* packet) return ret; } -int SrsProtocol::recv_message(SrsCommonMessage** pmsg) -{ - *pmsg = NULL; - - int ret = ERROR_SUCCESS; - - while (true) { - SrsCommonMessage* msg = NULL; - - if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("recv interlaced message failed. ret=%d", ret); - } - return ret; - } - srs_verbose("entire msg received"); - - if (!msg) { - continue; - } - - if (msg->size <= 0 || msg->header.payload_length <= 0) { - srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).", - msg->header.message_type, msg->header.payload_length, - msg->header.timestamp, msg->header.stream_id); - srs_freep(msg); - continue; - } - - if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) { - srs_error("hook the received msg failed. ret=%d", ret); - srs_freep(msg); - return ret; - } - - srs_verbose("get a msg with raw/undecoded payload"); - *pmsg = msg; - break; - } - - return ret; -} - -int SrsProtocol::send_message(ISrsMessage* msg) -{ - int ret = ERROR_SUCCESS; - - // free msg whatever return value. - SrsAutoFree(ISrsMessage, msg, false); - - if ((ret = msg->encode_packet()) != ERROR_SUCCESS) { - srs_error("encode packet to message payload failed. ret=%d", ret); - return ret; - } - srs_info("encode packet to message payload success"); - - // p set to current write position, - // it's ok when payload is NULL and size is 0. - char* p = (char*)msg->payload; - - // always write the header event payload is empty. - do { - // generate the header. - char* pheader = out_header_cache; - - if (p == (char*)msg->payload) { - // write new chunk stream header, fmt is 0 - *pheader++ = 0x00 | (msg->get_perfer_cid() & 0x3F); - - // chunk message header, 11 bytes - // timestamp, 3bytes, big-endian - u_int32_t timestamp = (u_int32_t)msg->header.timestamp; - if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { - *pheader++ = 0xFF; - *pheader++ = 0xFF; - *pheader++ = 0xFF; - } else { - pp = (char*)×tamp; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - } - - // message_length, 3bytes, big-endian - pp = (char*)&msg->header.payload_length; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - - // message_type, 1bytes - *pheader++ = msg->header.message_type; - - // message_length, 3bytes, little-endian - pp = (char*)&msg->header.stream_id; - *pheader++ = pp[0]; - *pheader++ = pp[1]; - *pheader++ = pp[2]; - *pheader++ = pp[3]; - - // chunk extended timestamp header, 0 or 4 bytes, big-endian - if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ - pp = (char*)×tamp; - *pheader++ = pp[3]; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - } - } else { - // write no message header chunk stream, fmt is 3 - *pheader++ = 0xC0 | (msg->get_perfer_cid() & 0x3F); - - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // 6.1.3. Extended Timestamp - // This field is transmitted only when the normal time stamp in the - // chunk message header is set to 0x00ffffff. If normal time stamp is - // set to any value less than 0x00ffffff, this field MUST NOT be - // present. This field MUST NOT be present if the timestamp field is not - // present. Type 3 chunks MUST NOT have this field. - // adobe changed for Type3 chunk: - // FMLE always sendout the extended-timestamp, - // must send the extended-timestamp to FMS, - // must send the extended-timestamp to flash-player. - // @see: ngx_rtmp_prepare_message - // @see: http://blog.csdn.net/win_lin/article/details/13363699 - u_int32_t timestamp = (u_int32_t)msg->header.timestamp; - if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ - pp = (char*)×tamp; - *pheader++ = pp[3]; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - } - } - - // sendout header and payload by writev. - // decrease the sys invoke count to get higher performance. - int payload_size = msg->size - (p - (char*)msg->payload); - payload_size = srs_min(payload_size, out_chunk_size); - - // always has header - int header_size = pheader - out_header_cache; - srs_assert(header_size > 0); - - // send by writev - iovec iov[2]; - iov[0].iov_base = out_header_cache; - iov[0].iov_len = header_size; - iov[1].iov_base = p; - iov[1].iov_len = payload_size; - - ssize_t nwrite; - if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) { - srs_error("send with writev failed. ret=%d", ret); - return ret; - } - - // consume sendout bytes when not empty packet. - if (msg->payload && msg->size > 0) { - p += payload_size; - } - } while (p < (char*)msg->payload + msg->size); - - if ((ret = on_send_message(msg)) != ERROR_SUCCESS) { - srs_error("hook the send message failed. ret=%d", ret); - return ret; - } - - return ret; -} - int SrsProtocol::response_acknowledgement_message() { int ret = ERROR_SUCCESS; - SrsCommonMessage* msg = new SrsCommonMessage(); SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket(); - in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes(); - msg->set_packet(pkt, 0); - - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send acknowledgement failed. ret=%d", ret); return ret; } @@ -1550,14 +1368,12 @@ int SrsProtocol::response_ping_message(int32_t timestamp) srs_trace("get a ping request, response it. timestamp=%d", timestamp); - SrsCommonMessage* msg = new SrsCommonMessage(); SrsUserControlPacket* pkt = new SrsUserControlPacket(); pkt->event_type = SrcPCUCPingResponse; pkt->event_data = timestamp; - msg->set_packet(pkt, 0); - if ((ret = send_message(msg)) != ERROR_SUCCESS) { + if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { srs_error("send ping response failed. ret=%d", ret); return ret; } @@ -1566,583 +1382,6 @@ int SrsProtocol::response_ping_message(int32_t timestamp) return ret; } -int SrsProtocol::on_recv_message(SrsCommonMessage* msg) -{ - int ret = ERROR_SUCCESS; - - srs_assert(msg != NULL); - - // acknowledgement - if (in_ack_size.ack_window_size > 0 && skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) { - if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) { - return ret; - } - } - - switch (msg->header.message_type) { - case RTMP_MSG_SetChunkSize: - case RTMP_MSG_UserControlMessage: - case RTMP_MSG_WindowAcknowledgementSize: - if ((ret = msg->decode_packet(this)) != ERROR_SUCCESS) { - srs_error("decode packet from message payload failed. ret=%d", ret); - return ret; - } - srs_verbose("decode packet from message payload success."); - break; - } - - switch (msg->header.message_type) { - case RTMP_MSG_WindowAcknowledgementSize: { - SrsSetWindowAckSizePacket* pkt = dynamic_cast(msg->get_packet()); - srs_assert(pkt != NULL); - - if (pkt->ackowledgement_window_size > 0) { - in_ack_size.ack_window_size = pkt->ackowledgement_window_size; - srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); - } else { - srs_warn("ignored. set ack window size is %d", pkt->ackowledgement_window_size); - } - break; - } - case RTMP_MSG_SetChunkSize: { - SrsSetChunkSizePacket* pkt = dynamic_cast(msg->get_packet()); - srs_assert(pkt != NULL); - - in_chunk_size = pkt->chunk_size; - - srs_trace("set input chunk size to %d", pkt->chunk_size); - break; - } - case RTMP_MSG_UserControlMessage: { - SrsUserControlPacket* pkt = dynamic_cast(msg->get_packet()); - srs_assert(pkt != NULL); - - if (pkt->event_type == SrcPCUCSetBufferLength) { - srs_trace("ignored. set buffer length to %d", pkt->extra_data); - } - if (pkt->event_type == SrcPCUCPingRequest) { - if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) { - return ret; - } - } - break; - } - } - - return ret; -} - -int SrsProtocol::on_send_message(ISrsMessage* msg) -{ - int ret = ERROR_SUCCESS; - - if (!msg->can_decode()) { - srs_verbose("ignore the un-decodable message."); - return ret; - } - - SrsCommonMessage* common_msg = dynamic_cast(msg); - if (!common_msg) { - srs_verbose("ignore the shared ptr message."); - return ret; - } - - // for proxy, the common msg is not decoded, ignore. - if (!common_msg->has_packet()) { - srs_verbose("ignore the proxy common message."); - return ret; - } - - srs_assert(common_msg != NULL); - - switch (common_msg->header.message_type) { - case RTMP_MSG_SetChunkSize: { - SrsSetChunkSizePacket* pkt = dynamic_cast(common_msg->get_packet()); - srs_assert(pkt != NULL); - - out_chunk_size = pkt->chunk_size; - - srs_trace("set output chunk size to %d", pkt->chunk_size); - break; - } - case RTMP_MSG_AMF0CommandMessage: - case RTMP_MSG_AMF3CommandMessage: { - if (true) { - SrsConnectAppPacket* pkt = NULL; - pkt = dynamic_cast(common_msg->get_packet()); - if (pkt) { - requests[pkt->transaction_id] = pkt->command_name; - break; - } - } - if (true) { - SrsCreateStreamPacket* pkt = NULL; - pkt = dynamic_cast(common_msg->get_packet()); - if (pkt) { - requests[pkt->transaction_id] = pkt->command_name; - break; - } - } - if (true) { - SrsFMLEStartPacket* pkt = NULL; - pkt = dynamic_cast(common_msg->get_packet()); - if (pkt) { - requests[pkt->transaction_id] = pkt->command_name; - break; - } - } - break; - } - } - - return ret; -} - -int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) -{ - int ret = ERROR_SUCCESS; - - // chunk stream basic header. - char fmt = 0; - int cid = 0; - int bh_size = 0; - if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read basic header failed. ret=%d", ret); - } - return ret; - } - srs_verbose("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); - - // once we got the chunk message header, - // that is there is a real message in cache, - // increase the timeout to got it. - // For example, in the play loop, we set timeout to 100ms, - // when we got a chunk header, we should increase the timeout, - // or we maybe timeout and disconnect the client. - int64_t timeout_us = skt->get_recv_timeout(); - if (!skt->is_never_timeout(timeout_us)) { - int64_t pkt_timeout_us = srs_max(timeout_us, SRS_MIN_RECV_TIMEOUT_US); - skt->set_recv_timeout(pkt_timeout_us); - srs_verbose("change recv timeout_us " - "from %"PRId64" to %"PRId64"", timeout_us, pkt_timeout_us); - } - - // get the cached chunk stream. - SrsChunkStream* chunk = NULL; - - if (chunk_streams.find(cid) == chunk_streams.end()) { - chunk = chunk_streams[cid] = new SrsChunkStream(cid); - srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); - } else { - chunk = chunk_streams[cid]; - srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", - chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, - chunk->header.timestamp, chunk->header.stream_id); - } - - // chunk stream message header - int mh_size = 0; - if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read message header failed. ret=%d", ret); - } - return ret; - } - srs_verbose("read message header success. " - "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", - fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, - chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); - - // read msg payload from chunk stream. - SrsCommonMessage* msg = NULL; - int payload_size = 0; - if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read message payload failed. ret=%d", ret); - } - return ret; - } - - // reset the recv timeout - if (!skt->is_never_timeout(timeout_us)) { - skt->set_recv_timeout(timeout_us); - srs_verbose("reset recv timeout_us to %"PRId64"", timeout_us); - } - - // not got an entire RTMP message, try next chunk. - if (!msg) { - srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", - payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, - chunk->header.timestamp, chunk->header.stream_id); - return ret; - } - - *pmsg = msg; - srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", - payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, - chunk->header.timestamp, chunk->header.stream_id); - - return ret; -} - -int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) -{ - int ret = ERROR_SUCCESS; - - int required_size = 1; - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); - } - return ret; - } - - char* p = buffer->bytes(); - - fmt = (*p >> 6) & 0x03; - cid = *p & 0x3f; - bh_size = 1; - - if (cid > 1) { - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); - return ret; - } - - if (cid == 0) { - required_size = 2; - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); - } - return ret; - } - - cid = 64; - cid += *(++p); - bh_size = 2; - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); - } else if (cid == 1) { - required_size = 3; - if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); - } - return ret; - } - - cid = 64; - cid += *(++p); - cid += *(++p) * 256; - bh_size = 3; - srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); - } else { - srs_error("invalid path, impossible basic header."); - srs_assert(false); - } - - return ret; -} - -int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size) -{ - int ret = ERROR_SUCCESS; - - /** - * we should not assert anything about fmt, for the first packet. - * (when first packet, the chunk->msg is NULL). - * the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet. - * the previous packet is: - * 04 // fmt=0, cid=4 - * 00 00 1a // timestamp=26 - * 00 00 9d // payload_length=157 - * 08 // message_type=8(audio) - * 01 00 00 00 // stream_id=1 - * the current packet maybe: - * c4 // fmt=3, cid=4 - * it's ok, for the packet is audio, and timestamp delta is 26. - * the current packet must be parsed as: - * fmt=0, cid=4 - * timestamp=26+26=52 - * payload_length=157 - * message_type=8(audio) - * stream_id=1 - * so we must update the timestamp even fmt=3 for first packet. - */ - // fresh packet used to update the timestamp even fmt=3 for first packet. - bool is_fresh_packet = !chunk->msg; - - // but, we can ensure that when a chunk stream is fresh, - // the fmt must be 0, a new stream. - if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) { - ret = ERROR_RTMP_CHUNK_START; - srs_error("chunk stream is fresh, " - "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); - return ret; - } - - // when exists cache msg, means got an partial message, - // the fmt must not be type0 which means new message. - if (chunk->msg && fmt == RTMP_FMT_TYPE0) { - ret = ERROR_RTMP_CHUNK_START; - srs_error("chunk stream exists, " - "fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); - return ret; - } - - // create msg when new chunk stream start - if (!chunk->msg) { - chunk->msg = new SrsCommonMessage(); - srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid); - } - - // read message header from socket to buffer. - static char mh_sizes[] = {11, 7, 3, 0}; - mh_size = mh_sizes[(int)fmt]; - srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); - - int required_size = bh_size + mh_size; - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); - } - return ret; - } - char* p = buffer->bytes() + bh_size; - - // parse the message header. - // see also: ngx_rtmp_recv - if (fmt <= RTMP_FMT_TYPE2) { - char* pp = (char*)&chunk->header.timestamp_delta; - pp[2] = *p++; - pp[1] = *p++; - pp[0] = *p++; - pp[3] = 0; - - // fmt: 0 - // timestamp: 3 bytes - // If the timestamp is greater than or equal to 16777215 - // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the - // ‘extended timestamp header’ MUST be present. Otherwise, this value - // SHOULD be the entire timestamp. - // - // fmt: 1 or 2 - // timestamp delta: 3 bytes - // If the delta is greater than or equal to 16777215 (hexadecimal - // 0x00ffffff), this value MUST be 16777215, and the ‘extended - // timestamp header’ MUST be present. Otherwise, this value SHOULD be - // the entire delta. - chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); - if (chunk->extended_timestamp) { - // Extended timestamp: 0 or 4 bytes - // This field MUST be sent when the normal timsestamp is set to - // 0xffffff, it MUST NOT be sent if the normal timestamp is set to - // anything else. So for values less than 0xffffff the normal - // timestamp field SHOULD be used in which case the extended timestamp - // MUST NOT be present. For values greater than or equal to 0xffffff - // the normal timestamp field MUST NOT be used and MUST be set to - // 0xffffff and the extended timestamp MUST be sent. - // - // if extended timestamp, the timestamp must >= RTMP_EXTENDED_TIMESTAMP - // we set the timestamp to RTMP_EXTENDED_TIMESTAMP to identify we - // got an extended timestamp. - chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP; - } else { - if (fmt == RTMP_FMT_TYPE0) { - // 6.1.2.1. Type 0 - // For a type-0 chunk, the absolute timestamp of the message is sent - // here. - chunk->header.timestamp = chunk->header.timestamp_delta; - } else { - // 6.1.2.2. Type 1 - // 6.1.2.3. Type 2 - // For a type-1 or type-2 chunk, the difference between the previous - // chunk's timestamp and the current chunk's timestamp is sent here. - chunk->header.timestamp += chunk->header.timestamp_delta; - } - } - - if (fmt <= RTMP_FMT_TYPE1) { - pp = (char*)&chunk->header.payload_length; - pp[2] = *p++; - pp[1] = *p++; - pp[0] = *p++; - pp[3] = 0; - - // if msg exists in cache, the size must not changed. - if (chunk->msg->size > 0 && chunk->msg->size != chunk->header.payload_length) { - ret = ERROR_RTMP_PACKET_SIZE; - srs_error("msg exists in chunk cache, " - "size=%d cannot change to %d, ret=%d", - chunk->msg->size, chunk->header.payload_length, ret); - return ret; - } - - chunk->header.message_type = *p++; - - if (fmt == RTMP_FMT_TYPE0) { - pp = (char*)&chunk->header.stream_id; - pp[0] = *p++; - pp[1] = *p++; - pp[2] = *p++; - pp[3] = *p++; - srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d, sid=%d", - fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length, - chunk->header.message_type, chunk->header.stream_id); - } else { - srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d", - fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length, - chunk->header.message_type); - } - } else { - srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64"", - fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp); - } - } else { - // update the timestamp even fmt=3 for first stream - if (is_fresh_packet && !chunk->extended_timestamp) { - chunk->header.timestamp += chunk->header.timestamp_delta; - } - srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d", - fmt, mh_size, chunk->extended_timestamp); - } - - if (chunk->extended_timestamp) { - mh_size += 4; - required_size = bh_size + mh_size; - srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); - } - return ret; - } - - u_int32_t timestamp = 0x00; - char* pp = (char*)×tamp; - pp[3] = *p++; - pp[2] = *p++; - pp[1] = *p++; - pp[0] = *p++; - - // ffmpeg/librtmp may donot send this filed, need to detect the value. - // @see also: http://blog.csdn.net/win_lin/article/details/13363699 - // compare to the chunk timestamp, which is set by chunk message header - // type 0,1 or 2. - u_int32_t chunk_timestamp = chunk->header.timestamp; - if (chunk_timestamp > RTMP_EXTENDED_TIMESTAMP && chunk_timestamp != timestamp) { - mh_size -= 4; - srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size); - } else { - chunk->header.timestamp = timestamp; - } - srs_verbose("header read ext_time completed. time=%"PRId64"", chunk->header.timestamp); - } - - // the extended-timestamp must be unsigned-int, - // 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h - // 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d - // because the rtmp protocol says the 32bits timestamp is about "50 days": - // 3. Byte Order, Alignment, and Time Format - // Because timestamps are generally only 32 bits long, they will roll - // over after fewer than 50 days. - // - // but, its sample says the timestamp is 31bits: - // An application could assume, for example, that all - // adjacent timestamps are within 2^31 milliseconds of each other, so - // 10000 comes after 4000000000, while 3000000000 comes before - // 4000000000. - // and flv specification says timestamp is 31bits: - // Extension of the Timestamp field to form a SI32 value. This - // field represents the upper 8 bits, while the previous - // Timestamp field represents the lower 24 bits of the time in - // milliseconds. - // in a word, 31bits timestamp is ok. - // convert extended timestamp to 31bits. - chunk->header.timestamp &= 0x7fffffff; - - // valid message - if (chunk->header.payload_length < 0) { - ret = ERROR_RTMP_MSG_INVLIAD_SIZE; - srs_error("RTMP message size must not be negative. size=%d, ret=%d", - chunk->header.payload_length, ret); - return ret; - } - - // copy header to msg - chunk->msg->header = chunk->header; - - // increase the msg count, the chunk stream can accept fmt=1/2/3 message now. - chunk->msg_count++; - - return ret; -} - -int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsCommonMessage** pmsg) -{ - int ret = ERROR_SUCCESS; - - // empty message - if (chunk->header.payload_length <= 0) { - // need erase the header in buffer. - buffer->erase(bh_size + mh_size); - - srs_trace("get an empty RTMP " - "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, - chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); - - *pmsg = chunk->msg; - chunk->msg = NULL; - - return ret; - } - srs_assert(chunk->header.payload_length > 0); - - // the chunk payload size. - payload_size = chunk->header.payload_length - chunk->msg->size; - payload_size = srs_min(payload_size, in_chunk_size); - srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", - payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); - - // create msg payload if not initialized - if (!chunk->msg->payload) { - chunk->msg->payload = new int8_t[chunk->header.payload_length]; - memset(chunk->msg->payload, 0, chunk->header.payload_length); - srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); - } - - // read payload to buffer - int required_size = bh_size + mh_size + payload_size; - if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { - srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); - } - return ret; - } - memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size); - buffer->erase(bh_size + mh_size + payload_size); - chunk->msg->size += payload_size; - - srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); - - // got entire RTMP message? - if (chunk->header.payload_length == chunk->msg->size) { - *pmsg = chunk->msg; - chunk->msg = NULL; - srs_verbose("get entire RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d)", - chunk->header.message_type, chunk->header.payload_length, - chunk->header.timestamp, chunk->header.stream_id); - return ret; - } - - srs_verbose("get partial RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d), partial size=%d", - chunk->header.message_type, chunk->header.payload_length, - chunk->header.timestamp, chunk->header.stream_id, - chunk->msg->size); - - return ret; -} - SrsMessageHeader::SrsMessageHeader() { message_type = 0; @@ -2235,20 +1474,6 @@ void SrsMessageHeader::initialize_video(int size, u_int32_t time, int stream) stream_id = (int32_t)stream; } -SrsChunkStream::SrsChunkStream(int _cid) -{ - fmt = 0; - cid = _cid; - extended_timestamp = false; - msg = NULL; - msg_count = 0; -} - -SrsChunkStream::~SrsChunkStream() -{ - srs_freep(msg); -} - __SrsChunkStream::__SrsChunkStream(int _cid) { fmt = 0; @@ -2365,411 +1590,6 @@ __SrsSharedPtrMessage* __SrsSharedPtrMessage::copy() return copy; } -ISrsMessage::ISrsMessage() -{ - payload = NULL; - size = 0; -} - -ISrsMessage::~ISrsMessage() -{ -} - -SrsCommonMessage::SrsCommonMessage() -{ - stream = NULL; - packet = NULL; -} - -SrsCommonMessage::~SrsCommonMessage() -{ - // we must directly free the ptrs, - // nevery use the virtual functions to delete, - // for in the destructor, the virtual functions is disabled. - - srs_freepa(payload); - srs_freep(packet); - srs_freep(stream); -} - -bool SrsCommonMessage::can_decode() -{ - return true; -} - -int SrsCommonMessage::decode_packet(SrsProtocol* protocol) -{ - int ret = ERROR_SUCCESS; - - srs_assert(payload != NULL); - srs_assert(size > 0); - - if (packet) { - srs_verbose("msg already decoded"); - return ret; - } - - if (!stream) { - srs_verbose("create decode stream for message."); - stream = new SrsStream(); - } - - // initialize the decode stream for all message, - // it's ok for the initialize if fast and without memory copy. - if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) { - srs_error("initialize stream failed. ret=%d", ret); - return ret; - } - srs_verbose("decode stream initialized success"); - - // decode specified packet type - if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) { - srs_verbose("start to decode AMF0/AMF3 command message."); - - // skip 1bytes to decode the amf3 command. - if (header.is_amf3_command() && stream->require(1)) { - srs_verbose("skip 1bytes to decode AMF3 command"); - stream->skip(1); - } - - // amf0 command message. - // need to read the command name. - std::string command; - if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { - srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret); - return ret; - } - srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str()); - - // result/error packet - if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) { - double transactionId = 0.0; - if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) { - srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret); - return ret; - } - srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId); - - // reset stream, for header read completed. - stream->reset(); - if (header.is_amf3_command()) { - stream->skip(1); - } - - std::string request_name = protocol->get_request_name(transactionId); - if (request_name.empty()) { - ret = ERROR_RTMP_NO_REQUEST; - srs_error("decode AMF0/AMF3 request failed. ret=%d", ret); - return ret; - } - srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str()); - - if (request_name == RTMP_AMF0_COMMAND_CONNECT) { - srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); - packet = new SrsConnectAppResPacket(); - return packet->decode(stream); - } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) { - srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); - packet = new SrsCreateStreamResPacket(0, 0); - return packet->decode(stream); - } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM - || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH - || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) { - srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str()); - packet = new SrsFMLEStartResPacket(0); - return packet->decode(stream); - } else { - ret = ERROR_RTMP_NO_REQUEST; - srs_error("decode AMF0/AMF3 request failed. " - "request_name=%s, transactionId=%.2f, ret=%d", - request_name.c_str(), transactionId, ret); - return ret; - } - } - - // reset to zero(amf3 to 1) to restart decode. - stream->reset(); - if (header.is_amf3_command()) { - stream->skip(1); - } - - // decode command object. - if (command == RTMP_AMF0_COMMAND_CONNECT) { - srs_info("decode the AMF0/AMF3 command(connect vhost/app message)."); - packet = new SrsConnectAppPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_CREATE_STREAM) { - srs_info("decode the AMF0/AMF3 command(createStream message)."); - packet = new SrsCreateStreamPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_PLAY) { - srs_info("decode the AMF0/AMF3 command(paly message)."); - packet = new SrsPlayPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_PAUSE) { - srs_info("decode the AMF0/AMF3 command(pause message)."); - packet = new SrsPausePacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) { - srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message)."); - packet = new SrsFMLEStartPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) { - srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message)."); - packet = new SrsFMLEStartPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_PUBLISH) { - srs_info("decode the AMF0/AMF3 command(publish message)."); - packet = new SrsPublishPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) { - srs_info("decode the AMF0/AMF3 command(unpublish message)."); - packet = new SrsFMLEStartPacket(); - return packet->decode(stream); - } else if(command == RTMP_AMF0_DATA_SET_DATAFRAME || command == RTMP_AMF0_DATA_ON_METADATA) { - srs_info("decode the AMF0/AMF3 data(onMetaData message)."); - packet = new SrsOnMetaDataPacket(); - return packet->decode(stream); - } else if(command == SRS_BW_CHECK_FINISHED - || command == SRS_BW_CHECK_PLAYING - || command == SRS_BW_CHECK_PUBLISHING - || command == SRS_BW_CHECK_STARTING_PLAY - || command == SRS_BW_CHECK_STARTING_PUBLISH - || command == SRS_BW_CHECK_START_PLAY - || command == SRS_BW_CHECK_START_PUBLISH - || command == SRS_BW_CHECK_STOPPED_PLAY - || command == SRS_BW_CHECK_STOP_PLAY - || command == SRS_BW_CHECK_STOP_PUBLISH - || command == SRS_BW_CHECK_STOPPED_PUBLISH - || command == SRS_BW_CHECK_FLASH_FINAL) - { - srs_info("decode the AMF0/AMF3 band width check message."); - packet = new SrsBandwidthPacket(); - return packet->decode(stream); - } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) { - srs_info("decode the AMF0/AMF3 closeStream message."); - packet = new SrsCloseStreamPacket(); - return packet->decode(stream); - } - - // default packet to drop message. - srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); - packet = new SrsPacket(); - return ret; - } else if(header.is_user_control_message()) { - srs_verbose("start to decode user control message."); - packet = new SrsUserControlPacket(); - return packet->decode(stream); - } else if(header.is_window_ackledgement_size()) { - srs_verbose("start to decode set ack window size message."); - packet = new SrsSetWindowAckSizePacket(); - return packet->decode(stream); - } else if(header.is_set_chunk_size()) { - srs_verbose("start to decode set chunk size message."); - packet = new SrsSetChunkSizePacket(); - return packet->decode(stream); - } else { - // default packet to drop message. - srs_trace("drop the unknown message, type=%d", header.message_type); - packet = new SrsPacket(); - } - - return ret; -} - -bool SrsCommonMessage::has_packet() -{ - return packet != NULL; -} - -SrsPacket* SrsCommonMessage::get_packet() -{ - if (!packet) { - srs_error("the payload is raw/undecoded, invoke decode_packet to decode it."); - } - srs_assert(packet != NULL); - - return packet; -} - -int SrsCommonMessage::get_perfer_cid() -{ - if (!packet) { - return RTMP_CID_ProtocolControl; - } - - // we donot use the complex basic header, - // ensure the basic header is 1bytes. - if (packet->get_perfer_cid() < 2) { - return packet->get_perfer_cid(); - } - - return packet->get_perfer_cid(); -} - -SrsCommonMessage* SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id) -{ - srs_freep(packet); - - packet = pkt; - - header.message_type = packet->get_message_type(); - header.payload_length = packet->get_payload_length(); - header.stream_id = stream_id; - - return this; -} - -int SrsCommonMessage::encode_packet() -{ - int ret = ERROR_SUCCESS; - - // sometimes, for example, the edge proxy, - // the payload is not decoded, so directly sent out. - if (payload != NULL) { - header.payload_length = size; - return ret; - } - - // encode packet to payload and size. - if (packet == NULL) { - srs_warn("packet is empty, send out empty message."); - return ret; - } - // realloc the payload. - size = 0; - srs_freepa(payload); - - if ((ret = packet->encode(size, (char*&)payload)) != ERROR_SUCCESS) { - return ret; - } - - header.payload_length = size; - - return ret; -} - -SrsSharedPtrMessage::SrsSharedPtr::SrsSharedPtr() -{ - payload = NULL; - size = 0; - perfer_cid = 0; - shared_count = 0; -} - -SrsSharedPtrMessage::SrsSharedPtr::~SrsSharedPtr() -{ - srs_freepa(payload); -} - -SrsSharedPtrMessage::SrsSharedPtrMessage() -{ - ptr = NULL; -} - -SrsSharedPtrMessage::~SrsSharedPtrMessage() -{ - if (ptr) { - if (ptr->shared_count == 0) { - srs_freep(ptr); - } else { - ptr->shared_count--; - } - } -} - -bool SrsSharedPtrMessage::can_decode() -{ - return false; -} - -int SrsSharedPtrMessage::initialize(SrsCommonMessage* source) -{ - int ret = ERROR_SUCCESS; - - if ((ret = initialize(&source->header, (char*)source->payload, source->size)) != ERROR_SUCCESS) { - return ret; - } - - // detach the payload from source - source->payload = NULL; - source->size = 0; - - return ret; -} - -int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size) -{ - int ret = ERROR_SUCCESS; - - srs_assert(source != NULL); - if (ptr) { - ret = ERROR_SYSTEM_ASSERT_FAILED; - srs_error("should not set the payload twice. ret=%d", ret); - srs_assert(false); - - return ret; - } - - header = *source; - header.payload_length = size; - - ptr = new SrsSharedPtr(); - - // direct attach the data of common message. - ptr->payload = payload; - ptr->size = size; - - if (source->is_video()) { - ptr->perfer_cid = RTMP_CID_Video; - } else if (source->is_audio()) { - ptr->perfer_cid = RTMP_CID_Audio; - } else { - ptr->perfer_cid = RTMP_CID_OverConnection2; - } - - ISrsMessage::payload = (int8_t*)ptr->payload; - ISrsMessage::size = ptr->size; - - return ret; -} - -SrsSharedPtrMessage* SrsSharedPtrMessage::copy() -{ - if (!ptr) { - srs_error("invoke initialize to initialize the ptr."); - srs_assert(false); - return NULL; - } - - SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); - - copy->header = header; - - copy->ptr = ptr; - ptr->shared_count++; - - copy->payload = (int8_t*)ptr->payload; - copy->size = ptr->size; - - return copy; -} - -int SrsSharedPtrMessage::get_perfer_cid() -{ - if (!ptr) { - return 0; - } - - return ptr->perfer_cid; -} - -int SrsSharedPtrMessage::encode_packet() -{ - srs_verbose("shared message ignore the encode method."); - return ERROR_SUCCESS; -} - SrsPacket::SrsPacket() { } diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index bf80348ea..d2ec51e09 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -40,11 +40,8 @@ class ISrsProtocolReaderWriter; class SrsBuffer; class SrsPacket; class SrsStream; -class SrsCommonMessage; -class SrsChunkStream; class SrsAmf0Object; class SrsAmf0Any; -class ISrsMessage; class SrsMessageHeader; class __SrsMessage; class __SrsChunkStream; @@ -112,7 +109,6 @@ private: std::map requests; // peer in private: - std::map chunk_streams; // TODO: FIXME: rename to chunk_streams std::map __chunk_streams; SrsStream* decode_stream; @@ -220,58 +216,9 @@ private: * when message sentout, update the context. */ virtual int __on_send_message(__SrsMessage* msg, SrsPacket* packet); -public: - /** - * recv a message with raw/undecoded payload from peer. - * the payload is not decoded, use srs_rtmp_expect_message if requires - * specifies message. - * @pmsg, user must free it. NULL if not success. - * @remark, only when success, user can use and must free the pmsg. - */ - virtual int recv_message(SrsCommonMessage** pmsg); - /** - * send out message with encoded payload to peer. - * use the message encode method to encode to payload, - * then sendout over socket. - * @msg this method will free it whatever return value. - */ - virtual int send_message(ISrsMessage* msg); private: - /** - * when recv message, update the context. - */ - virtual int on_recv_message(SrsCommonMessage* msg); virtual int response_acknowledgement_message(); virtual int response_ping_message(int32_t timestamp); - /** - * when message sentout, update the context. - */ - virtual int on_send_message(ISrsMessage* msg); - /** - * try to recv interlaced message from peer, - * return error if error occur and nerver set the pmsg, - * return success and pmsg set to NULL if no entire message got, - * return success and pmsg set to entire message if got one. - */ - virtual int recv_interlaced_message(SrsCommonMessage** pmsg); - /** - * read the chunk basic header(fmt, cid) from chunk stream. - * user can discovery a SrsChunkStream by cid. - * @bh_size return the chunk basic header size, to remove the used bytes when finished. - */ - virtual int read_basic_header(char& fmt, int& cid, int& bh_size); - /** - * read the chunk message header(timestamp, payload_length, message_type, stream_id) - * from chunk stream and save to SrsChunkStream. - * @mh_size return the chunk message header size, to remove the used bytes when finished. - */ - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); - /** - * read the chunk payload, remove the used bytes in buffer, - * if got entire message, set the pmsg. - * @payload_size read size in this roundtrip, generally a chunk size or left message size. - */ - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsCommonMessage** pmsg); }; /** @@ -338,44 +285,6 @@ public: void initialize_video(int size, u_int32_t time, int stream); }; -/** -* incoming chunk stream maybe interlaced, -* use the chunk stream to cache the input RTMP chunk streams. -*/ -class SrsChunkStream -{ -public: - /** - * represents the basic header fmt, - * which used to identify the variant message header type. - */ - char fmt; - /** - * represents the basic header cid, - * which is the chunk stream id. - */ - int cid; - /** - * cached message header - */ - SrsMessageHeader header; - /** - * whether the chunk message header has extended timestamp. - */ - bool extended_timestamp; - /** - * partially read message. - */ - SrsCommonMessage* msg; - /** - * decoded msg count, to identify whether the chunk stream is fresh. - */ - int64_t msg_count; -public: - SrsChunkStream(int _cid); - virtual ~SrsChunkStream(); -}; - /** * incoming chunk stream maybe interlaced, * use the chunk stream to cache the input RTMP chunk streams. @@ -479,156 +388,6 @@ public: virtual __SrsSharedPtrMessage* copy(); }; -/** -* message to output. -*/ -class ISrsMessage -{ -// 4.1. Message Header -public: - SrsMessageHeader header; -// 4.2. Message Payload -public: - /** - * The other part which is the payload is the actual data that is - * contained in the message. For example, it could be some audio samples - * or compressed video data. The payload format and interpretation are - * beyond the scope of this document. - */ - int32_t size; - int8_t* payload; -public: - ISrsMessage(); - virtual ~ISrsMessage(); -public: - /** - * whether message canbe decoded. - * only update the context when message canbe decoded. - */ - virtual bool can_decode() = 0; -/** -* encode functions. -*/ -public: - /** - * get the perfered cid(chunk stream id) which sendout over. - */ - virtual int get_perfer_cid() = 0; - /** - * encode the packet to message payload bytes. - * @remark there exists empty packet, so maybe the payload is NULL. - */ - virtual int encode_packet() = 0; -}; - -/** -* common RTMP message defines in rtmp.part2.Message-Formats.pdf. -* cannbe parse and decode. -*/ -class SrsCommonMessage : public ISrsMessage -{ -private: - disable_default_copy(SrsCommonMessage); -// decoded message payload. -private: - SrsStream* stream; - SrsPacket* packet; -public: - SrsCommonMessage(); - virtual ~SrsCommonMessage(); -public: - virtual bool can_decode(); -/** -* decode functions. -*/ -public: - /** - * decode packet from message payload. - */ - // TODO: use protocol to decode it. - virtual int decode_packet(SrsProtocol* protocol); - /** - * whether msg has decoded packet. - */ - virtual bool has_packet(); - /** - * get the decoded packet which decoded by decode_packet(). - * @remark, user never free the pkt, the message will auto free it. - */ - virtual SrsPacket* get_packet(); -/** -* encode functions. -*/ -public: - /** - * get the perfered cid(chunk stream id) which sendout over. - */ - virtual int get_perfer_cid(); - /** - * set the encoded packet to encode_packet() to payload. - * @stream_id, the id of stream which is created by createStream. - * @remark, user never free the pkt, the message will auto free it. - * @return message itself. - */ - // TODO: refine the send methods. - virtual SrsCommonMessage* set_packet(SrsPacket* pkt, int stream_id); - /** - * encode the packet to message payload bytes. - * @remark there exists empty packet, so maybe the payload is NULL. - */ - virtual int encode_packet(); -}; - -/** -* shared ptr message. -* for audio/video/data message that need less memory copy. -* and only for output. -*/ -class SrsSharedPtrMessage : public ISrsMessage -{ -private: - struct SrsSharedPtr - { - char* payload; - int size; - int perfer_cid; - int shared_count; - - SrsSharedPtr(); - virtual ~SrsSharedPtr(); - }; - SrsSharedPtr* ptr; -public: - SrsSharedPtrMessage(); - virtual ~SrsSharedPtrMessage(); -public: - virtual bool can_decode(); -public: - /** - * set the shared payload. - * we will detach the payload of source, - * so ensure donot use it before. - */ - virtual int initialize(SrsCommonMessage* source); - /** - * set the shared payload. - * use source header, and specified param payload. - */ - virtual int initialize(SrsMessageHeader* source, char* payload, int size); - virtual SrsSharedPtrMessage* copy(); -public: - /** - * get the perfered cid(chunk stream id) which sendout over. - */ - virtual int get_perfer_cid(); - /** - * ignored. - * for shared message, nothing should be done. - * use initialize() to set the data. - */ - virtual int encode_packet(); -}; - /** * the decoded message payload. * @remark we seperate the packet from message, @@ -1374,44 +1133,6 @@ protected: * if need to set timeout, use set timeout of SrsProtocol. */ template -int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** ppacket) -{ - *pmsg = NULL; - *ppacket = NULL; - - int ret = ERROR_SUCCESS; - - while (true) { - SrsCommonMessage* msg = NULL; - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv message failed. ret=%d", ret); - return ret; - } - srs_verbose("recv message success."); - - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { - delete msg; - srs_error("decode message failed. ret=%d", ret); - return ret; - } - - T* pkt = dynamic_cast(msg->get_packet()); - if (!pkt) { - delete msg; - srs_trace("drop message(type=%d, size=%d, time=%"PRId64", sid=%d).", - msg->header.message_type, msg->header.payload_length, - msg->header.timestamp, msg->header.stream_id); - continue; - } - - *pmsg = msg; - *ppacket = pkt; - break; - } - - return ret; -} -template int __srs_rtmp_expect_message(SrsProtocol* protocol, __SrsMessage** pmsg, T** ppacket) { *pmsg = NULL;