From 2731fe1f3e079bc3762c0774c7f7ae0abc4b35cf Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 12 Dec 2019 15:11:31 +0800 Subject: [PATCH] SrsPacket supports converting to message, so can be sent by one API. --- README.md | 1 + trunk/src/kernel/srs_kernel_buffer.hpp | 2 + trunk/src/kernel/srs_kernel_flv.hpp | 7 +- trunk/src/kernel/srs_kernel_stream.cpp | 6 +- trunk/src/protocol/srs_rtmp_stack.cpp | 110 +++--- trunk/src/protocol/srs_rtmp_stack.hpp | 6 +- trunk/src/utest/srs_utest_protostack.cpp | 410 +++++++++++++++++++++++ 7 files changed, 472 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index b2aba30a6..01db71424 100755 --- a/README.md +++ b/README.md @@ -145,6 +145,7 @@ For previous versions, please read: ## V3 changes +* v3.0, 2019-12-11, SrsPacket supports converting to message, so can be sent by one API. * v3.0, 2019-12-11, For [#1042][bug #1042], cover RTMP client/server protocol. * v3.0, 2019-12-11, Fix [#1445][bug #1445], limit the createStream recursive depth. 3.0.70 * v3.0, 2019-12-11, For [#1042][bug #1042], cover RTMP handshake protocol. diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index 45a5542ce..5bbc420c4 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -95,6 +95,8 @@ private: int nb_bytes; public: SrsBuffer(); + // Initialize buffer with data b and size nb_b. + // @remark User must free the data b. SrsBuffer(char* b, int nb_b); virtual ~SrsBuffer(); // get the status of stream diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index bc2ad6ed7..97e1c7e32 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -37,6 +37,7 @@ class SrsBuffer; class ISrsWriter; class ISrsReader; class SrsFileReader; +class SrsPacket; #define SRS_FLV_TAG_HEADER_SIZE 11 #define SRS_FLV_PREVIOUS_TAG_SIZE 4 @@ -231,8 +232,9 @@ public: // The message header for shared ptr message. // only the message for all msgs are same. -struct SrsSharedMessageHeader +class SrsSharedMessageHeader { +public: // 3bytes. // Three-byte field that represents the size of the payload in bytes. // It is set in big-endian format. @@ -245,7 +247,7 @@ struct SrsSharedMessageHeader // set at decoding, and canbe used for directly send message, // For example, dispatch to all connections. int perfer_cid; - +public: SrsSharedMessageHeader(); virtual ~SrsSharedMessageHeader(); }; @@ -309,6 +311,7 @@ public: // copy header, manage the payload of msg, // set the payload to NULL to prevent double free. // @remark payload of msg set to NULL if success. + // @remark User should free the msg. virtual srs_error_t create(SrsCommonMessage* msg); // Create shared ptr message, // from the header and payload. diff --git a/trunk/src/kernel/srs_kernel_stream.cpp b/trunk/src/kernel/srs_kernel_stream.cpp index cec9f3d9a..97cc7d9d3 100755 --- a/trunk/src/kernel/srs_kernel_stream.cpp +++ b/trunk/src/kernel/srs_kernel_stream.cpp @@ -64,9 +64,9 @@ void SrsSimpleStream::erase(int size) void SrsSimpleStream::append(const char* bytes, int size) { - srs_assert(size > 0); - - data.insert(data.end(), bytes, bytes + size); + if (size > 0) { + data.insert(data.end(), bytes, bytes + size); + } } void SrsSimpleStream::append(SrsSimpleStream* src) diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index 309421d29..03d88c9a6 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -137,6 +137,36 @@ SrsPacket::~SrsPacket() { } +srs_error_t SrsPacket::to_msg(SrsCommonMessage* msg, int stream_id) +{ + srs_error_t err = srs_success; + + int size = 0; + char* payload = NULL; + if ((err = encode(size, payload)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + + // encode packet to payload and size. + if (size <= 0 || payload == NULL) { + srs_warn("packet is empty, ignore empty message."); + return err; + } + + // to message + SrsMessageHeader header; + header.payload_length = size; + header.message_type = get_message_type(); + header.stream_id = stream_id; + header.perfer_cid = get_prefer_cid(); + + if ((err = msg->create(&header, payload, size)) != srs_success) { + return srs_error_wrap(err, "create %dB message", size); + } + + return err; +} + srs_error_t SrsPacket::encode(int& psize, char*& ppayload) { srs_error_t err = srs_success; @@ -570,70 +600,26 @@ srs_error_t SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_i srs_assert(packet); SrsAutoFree(SrsPacket, packet); - - int size = 0; - char* payload = NULL; - if ((err = packet->encode(size, payload)) != srs_success) { - return srs_error_wrap(err, "encode packet"); - } - - // encode packet to payload and size. - if (size <= 0 || payload == NULL) { - srs_warn("packet is empty, ignore empty message."); - return err; - } - - // to message - SrsMessageHeader header; - header.payload_length = size; - header.message_type = packet->get_message_type(); - header.stream_id = stream_id; - header.perfer_cid = packet->get_prefer_cid(); - - err = do_simple_send(&header, payload, size); - srs_freepa(payload); - if (err != srs_success) { - return srs_error_wrap(err, "simple send"); - } - - if ((err = on_send_packet(&header, packet)) != srs_success) { - return srs_error_wrap(err, "on send packet"); - } - - return err; -} -srs_error_t SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size) -{ - srs_error_t err = srs_success; + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, msg); + + if ((err = packet->to_msg(msg, stream_id)) != srs_success) { + return srs_error_wrap(err, "to message"); + } + + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + if ((err = shared_msg->create(msg)) != srs_success) { + srs_freep(shared_msg); + return srs_error_wrap(err, "create message"); + } + + if ((err = send_and_free_message(shared_msg, stream_id)) != srs_success) { + return srs_error_wrap(err, "send packet"); + } - // we directly send out the packet, - // use very simple algorithm, not very fast, - // but it's ok. - char* p = payload; - char* end = p + size; - char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; - while (p < end) { - int nbh = 0; - if (p == payload) { - nbh = srs_chunk_header_c0(mh->perfer_cid, (uint32_t)mh->timestamp, mh->payload_length, mh->message_type, mh->stream_id, c0c3, sizeof(c0c3)); - } else { - nbh = srs_chunk_header_c3(mh->perfer_cid, (uint32_t)mh->timestamp, c0c3, sizeof(c0c3)); - } - srs_assert(nbh > 0);; - - iovec iovs[2]; - iovs[0].iov_base = c0c3; - iovs[0].iov_len = nbh; - - int payload_size = srs_min((int)(end - p), out_chunk_size); - iovs[1].iov_base = p; - iovs[1].iov_len = payload_size; - p += payload_size; - - if ((err = skt->writev(iovs, 2, NULL)) != srs_success) { - return srs_error_wrap(err, "writev packet"); - } + if ((err = on_send_packet(&msg->header, packet)) != srs_success) { + return srs_error_wrap(err, "on send packet"); } return err; diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index 2db39b521..79122f3d9 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -115,6 +115,9 @@ class SrsPacket public: SrsPacket(); virtual ~SrsPacket(); +public: + // Covert packet to common message. + virtual srs_error_t to_msg(SrsCommonMessage* msg, int stream_id); public: // The subpacket can override this encode, // For example, video and audio will directly set the payload withou memory copy, @@ -356,9 +359,6 @@ private: virtual srs_error_t do_iovs_send(iovec* iovs, int size); // The underlayer api for send and free packet. virtual srs_error_t do_send_and_free_packet(SrsPacket* packet, int stream_id); - // Use simple algorithm to send the header and bytes. - // @remark, for do_send_and_free_packet to send. - virtual srs_error_t do_simple_send(SrsMessageHeader* mh, char* payload, int size); // The imp for decode_message virtual srs_error_t do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket); // Recv bytes oriented RTMP message from protocol stack. diff --git a/trunk/src/utest/srs_utest_protostack.cpp b/trunk/src/utest/srs_utest_protostack.cpp index 9a976bb21..804345f9b 100644 --- a/trunk/src/utest/srs_utest_protostack.cpp +++ b/trunk/src/utest/srs_utest_protostack.cpp @@ -36,6 +36,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#define SRS_DEFAULT_RECV_BUFFER_SIZE 131072 + using namespace std; class MockPacket : public SrsPacket @@ -138,6 +140,18 @@ VOID TEST(ProtoStackTest, ManualFlush) EXPECT_EQ(12+4, io.out_buffer.length()); } + if (true) { + MockBufferIO io; + SrsRtmpServer p(&io); + + // Always response ACK message. + HELPER_EXPECT_SUCCESS(p.set_in_window_ack_size(1)); + + p.set_auto_response(true); + HELPER_EXPECT_SUCCESS(p.protocol->response_acknowledgement_message()); + EXPECT_EQ(12+4, io.out_buffer.length()); + } + if (true) { MockBufferIO io; SrsProtocol p(&io); @@ -1304,6 +1318,22 @@ VOID TEST(ProtoStackTest, HandshakeC0C1) EXPECT_EQ(0x01020304, hs.proxy_real_ip); } + // It's extended c0c1 prefixed with ip, which should be ok. + if (true) { + uint8_t buf[1537 + 7] = { + 0xF3, 0x00, 0x04, + 0x01, 0x02, 0x03, 0x04, + }; + HELPER_ARRAY_INIT(buf+7, 1537, 0x00); + + MockBufferIO io; + io.append(buf, sizeof(buf)); + + SrsRtmpServer r(&io); + HELPER_EXPECT_SUCCESS(r.hs_bytes->read_c0c1(&io)); + EXPECT_EQ(0x01020304, r.proxy_real_ip()); + } + // It seems a normal c0c1, but it's extended, so it fail. if (true) { uint8_t buf[1537] = { @@ -2396,7 +2426,14 @@ VOID TEST(ProtoStackTest, CoverAll) MockBufferIO io; SrsRtmpServer r(&io); r.set_recv_timeout(100 * SRS_UTIME_MILLISECONDS); + EXPECT_EQ(100 * SRS_UTIME_MILLISECONDS, r.get_recv_timeout()); + r.set_send_timeout(100 * SRS_UTIME_MILLISECONDS); + EXPECT_EQ(100 * SRS_UTIME_MILLISECONDS, r.get_send_timeout()); + + r.set_recv_buffer(SRS_DEFAULT_RECV_BUFFER_SIZE + 10); + EXPECT_EQ(SRS_DEFAULT_RECV_BUFFER_SIZE + 10, r.protocol->in_buffer->nb_buffer); + EXPECT_EQ(0, r.get_recv_bytes()); EXPECT_EQ(0, r.get_send_bytes()); @@ -2407,6 +2444,107 @@ VOID TEST(ProtoStackTest, CoverAll) HELPER_EXPECT_SUCCESS(r.send_and_free_packet(pkt, 0)); EXPECT_TRUE(r.get_send_bytes() > 0); } + + if (true) { + MockBufferIO io; + SrsRtmpServer r(&io); + HELPER_ASSERT_SUCCESS(r.set_peer_bandwidth(0, 0)); + EXPECT_TRUE(r.get_send_bytes() > 0); + } + + if (true) { + SrsBandwidthPacket p; + + p.set_command("onSrsBandCheckStartPlayBytes"); + EXPECT_TRUE(p.is_start_play()); + + p.command_name = "onSrsBandCheckStartPlayBytes"; + EXPECT_TRUE(p.is_start_play()); + } + + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_start_play(); + EXPECT_TRUE(p->is_start_play()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_starting_play(); + EXPECT_TRUE(p->is_starting_play()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_playing(); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_stop_play(); + EXPECT_TRUE(p->is_stop_play()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_stopped_play(); + EXPECT_TRUE(p->is_stopped_play()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_start_publish(); + EXPECT_TRUE(p->is_start_publish()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_starting_publish(); + EXPECT_TRUE(p->is_starting_publish()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_publishing(); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_stop_publish(); + EXPECT_TRUE(p->is_stop_publish()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_stopped_publish(); + EXPECT_TRUE(p->is_stopped_publish()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_finish(); + EXPECT_TRUE(p->is_finish()); + srs_freep(p); + } + if (true) { + SrsBandwidthPacket* p = SrsBandwidthPacket::create_final(); + EXPECT_TRUE(p->is_final()); + srs_freep(p); + } + + if (true) { + SrsAmf0Any* name = SrsAmf0Any::str("call"); + + SrsAmf0EcmaArray* arr = SrsAmf0Any::ecma_array(); + arr->set("license", SrsAmf0Any::str("MIT")); + + int nn = name->total_size() + arr->total_size(); + char* b = new char[nn]; + SrsAutoFreeA(char, b); + + SrsBuffer buf(b, nn); + HELPER_ASSERT_SUCCESS(name->write(&buf)); + HELPER_ASSERT_SUCCESS(arr->write(&buf)); + + SrsOnMetaDataPacket* p = new SrsOnMetaDataPacket(); + SrsAutoFree(SrsOnMetaDataPacket, p); + + buf.skip(-1 * buf.pos()); + HELPER_ASSERT_SUCCESS(p->decode(&buf)); + + SrsAmf0Any* prop = p->metadata->get_property("license"); + ASSERT_TRUE(prop && prop->is_string()); + EXPECT_STREQ("MIT", prop->to_str().c_str()); + } } VOID TEST(ProtoStackTest, ComplexToSimpleHandshake) @@ -2495,3 +2633,275 @@ VOID TEST(ProtoStackTest, ConnectAppWithArgs) } } +VOID TEST(ProtoStackTest, AgentMessageCodec) +{ + srs_error_t err; + + if (true) { + MockBufferIO io; + SrsRtmpClient p(&io); + + if (true) { + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + srs_freep(msg); + } + } + + if (true) { + MockBufferIO io; + SrsRtmpClient p(&io); + + if (true) { + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_ASSERT_SUCCESS(p.recv_message(&msg)); + + SrsPacket* pkt = NULL; + HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt)); + + srs_freep(msg); + srs_freep(pkt); + } + } + + if (true) { + MockBufferIO io; + SrsRtmpServer p(&io); + + if (true) { + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + srs_freep(msg); + } + } + + if (true) { + MockBufferIO io; + SrsRtmpServer p(&io); + + if (true) { + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_ASSERT_SUCCESS(p.recv_message(&msg)); + + SrsPacket* pkt = NULL; + HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt)); + + srs_freep(msg); + srs_freep(pkt); + } + } +} + +srs_error_t _mock_packet_to_shared_msg(SrsPacket* packet, int stream_id, SrsSharedPtrMessage* shared_msg) +{ + srs_error_t err = srs_success; + + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, msg); + + if ((err = packet->to_msg(msg, stream_id)) != srs_success) { + srs_freep(msg); + return err; + } + + if ((err = shared_msg->create(msg)) != srs_success) { + return err; + } + + return err; +} + +VOID TEST(ProtoStackTest, CheckStreamID) +{ + srs_error_t err; + + if (true) { + MockBufferIO io; + SrsRtmpClient p(&io); + + if (true) { + SrsSharedPtrMessage* shared_msgs[2]; + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + SrsAutoFree(SrsConnectAppPacket, res); + + if (true) { + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg)); + shared_msgs[0] = shared_msg; + } + + if (true) { + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 2, shared_msg)); + shared_msgs[1] = shared_msg; + } + + HELPER_EXPECT_SUCCESS(p.send_and_free_messages(shared_msgs, 2, 1)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + EXPECT_EQ(1, msg->header.stream_id); + srs_freep(msg); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + EXPECT_EQ(2, msg->header.stream_id); + srs_freep(msg); + } + } +} + +VOID TEST(ProtoStackTest, AgentMessageTransform) +{ + srs_error_t err; + + if (true) { + MockBufferIO io; + SrsRtmpClient p(&io); + + if (true) { + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg)); + srs_freep(res); + + HELPER_EXPECT_SUCCESS(p.send_and_free_message(shared_msg, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + srs_freep(msg); + } + } + + if (true) { + MockBufferIO io; + SrsRtmpClient p(&io); + + if (true) { + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg)); + srs_freep(res); + + HELPER_EXPECT_SUCCESS(p.send_and_free_messages(&shared_msg, 1, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + srs_freep(msg); + } + } + + if (true) { + MockBufferIO io; + SrsRtmpServer p(&io); + + if (true) { + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg)); + srs_freep(res); + + HELPER_EXPECT_SUCCESS(p.send_and_free_message(shared_msg, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + srs_freep(msg); + } + } + + if (true) { + MockBufferIO io; + SrsRtmpServer p(&io); + + if (true) { + SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage(); + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg)); + srs_freep(res); + + HELPER_EXPECT_SUCCESS(p.send_and_free_messages(&shared_msg, 1, 0)); + io.in_buffer.append(&io.out_buffer); + } + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(p.recv_message(&msg)); + srs_freep(msg); + } + } +} + +class MockMRHandler : public IMergeReadHandler +{ +public: + ssize_t nn; + MockMRHandler() : nn(0) { + } + virtual void on_read(ssize_t nread) { + nn += nread; + } +}; + +VOID TEST(ProtoStackTest, MergeReadHandler) +{ + srs_error_t err; + + MockBufferIO io; + SrsRtmpServer r(&io); + + if (true) { + SrsConnectAppPacket* res = new SrsConnectAppPacket(); + HELPER_EXPECT_SUCCESS(r.send_and_free_packet(res, 0)); + io.in_buffer.append(&io.out_buffer); + } + + MockMRHandler h; + r.set_merge_read(true, &h); + + if (true) { + SrsCommonMessage* msg = NULL; + HELPER_EXPECT_SUCCESS(r.recv_message(&msg)); + srs_freep(msg); + } + + EXPECT_TRUE(h.nn > 0); +} +