1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SrsPacket supports converting to message, so can be sent by one API.

This commit is contained in:
winlin 2019-12-12 15:11:31 +08:00
parent 82c4f41975
commit 2731fe1f3e
7 changed files with 472 additions and 70 deletions

View file

@ -145,6 +145,7 @@ For previous versions, please read:
## V3 changes
* v3.0, 2019-12-11, SrsPacket supports converting to message, so can be sent by one API.
* v3.0, 2019-12-11, For [#1042][bug #1042], cover RTMP client/server protocol.
* v3.0, 2019-12-11, Fix [#1445][bug #1445], limit the createStream recursive depth. 3.0.70
* v3.0, 2019-12-11, For [#1042][bug #1042], cover RTMP handshake protocol.

View file

@ -95,6 +95,8 @@ private:
int nb_bytes;
public:
SrsBuffer();
// Initialize buffer with data b and size nb_b.
// @remark User must free the data b.
SrsBuffer(char* b, int nb_b);
virtual ~SrsBuffer();
// get the status of stream

View file

@ -37,6 +37,7 @@ class SrsBuffer;
class ISrsWriter;
class ISrsReader;
class SrsFileReader;
class SrsPacket;
#define SRS_FLV_TAG_HEADER_SIZE 11
#define SRS_FLV_PREVIOUS_TAG_SIZE 4
@ -231,8 +232,9 @@ public:
// The message header for shared ptr message.
// only the message for all msgs are same.
struct SrsSharedMessageHeader
class SrsSharedMessageHeader
{
public:
// 3bytes.
// Three-byte field that represents the size of the payload in bytes.
// It is set in big-endian format.
@ -245,7 +247,7 @@ struct SrsSharedMessageHeader
// set at decoding, and canbe used for directly send message,
// For example, dispatch to all connections.
int perfer_cid;
public:
SrsSharedMessageHeader();
virtual ~SrsSharedMessageHeader();
};
@ -309,6 +311,7 @@ public:
// copy header, manage the payload of msg,
// set the payload to NULL to prevent double free.
// @remark payload of msg set to NULL if success.
// @remark User should free the msg.
virtual srs_error_t create(SrsCommonMessage* msg);
// Create shared ptr message,
// from the header and payload.

View file

@ -64,9 +64,9 @@ void SrsSimpleStream::erase(int size)
void SrsSimpleStream::append(const char* bytes, int size)
{
srs_assert(size > 0);
if (size > 0) {
data.insert(data.end(), bytes, bytes + size);
}
}
void SrsSimpleStream::append(SrsSimpleStream* src)

View file

@ -137,6 +137,36 @@ SrsPacket::~SrsPacket()
{
}
srs_error_t SrsPacket::to_msg(SrsCommonMessage* msg, int stream_id)
{
srs_error_t err = srs_success;
int size = 0;
char* payload = NULL;
if ((err = encode(size, payload)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
// encode packet to payload and size.
if (size <= 0 || payload == NULL) {
srs_warn("packet is empty, ignore empty message.");
return err;
}
// to message
SrsMessageHeader header;
header.payload_length = size;
header.message_type = get_message_type();
header.stream_id = stream_id;
header.perfer_cid = get_prefer_cid();
if ((err = msg->create(&header, payload, size)) != srs_success) {
return srs_error_wrap(err, "create %dB message", size);
}
return err;
}
srs_error_t SrsPacket::encode(int& psize, char*& ppayload)
{
srs_error_t err = srs_success;
@ -571,74 +601,30 @@ srs_error_t SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_i
srs_assert(packet);
SrsAutoFree(SrsPacket, packet);
int size = 0;
char* payload = NULL;
if ((err = packet->encode(size, payload)) != srs_success) {
return srs_error_wrap(err, "encode packet");
SrsCommonMessage* msg = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, msg);
if ((err = packet->to_msg(msg, stream_id)) != srs_success) {
return srs_error_wrap(err, "to message");
}
// encode packet to payload and size.
if (size <= 0 || payload == NULL) {
srs_warn("packet is empty, ignore empty message.");
return err;
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
if ((err = shared_msg->create(msg)) != srs_success) {
srs_freep(shared_msg);
return srs_error_wrap(err, "create message");
}
// to message
SrsMessageHeader header;
header.payload_length = size;
header.message_type = packet->get_message_type();
header.stream_id = stream_id;
header.perfer_cid = packet->get_prefer_cid();
err = do_simple_send(&header, payload, size);
srs_freepa(payload);
if (err != srs_success) {
return srs_error_wrap(err, "simple send");
if ((err = send_and_free_message(shared_msg, stream_id)) != srs_success) {
return srs_error_wrap(err, "send packet");
}
if ((err = on_send_packet(&header, packet)) != srs_success) {
if ((err = on_send_packet(&msg->header, packet)) != srs_success) {
return srs_error_wrap(err, "on send packet");
}
return err;
}
srs_error_t SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size)
{
srs_error_t err = srs_success;
// we directly send out the packet,
// use very simple algorithm, not very fast,
// but it's ok.
char* p = payload;
char* end = p + size;
char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
while (p < end) {
int nbh = 0;
if (p == payload) {
nbh = srs_chunk_header_c0(mh->perfer_cid, (uint32_t)mh->timestamp, mh->payload_length, mh->message_type, mh->stream_id, c0c3, sizeof(c0c3));
} else {
nbh = srs_chunk_header_c3(mh->perfer_cid, (uint32_t)mh->timestamp, c0c3, sizeof(c0c3));
}
srs_assert(nbh > 0);;
iovec iovs[2];
iovs[0].iov_base = c0c3;
iovs[0].iov_len = nbh;
int payload_size = srs_min((int)(end - p), out_chunk_size);
iovs[1].iov_base = p;
iovs[1].iov_len = payload_size;
p += payload_size;
if ((err = skt->writev(iovs, 2, NULL)) != srs_success) {
return srs_error_wrap(err, "writev packet");
}
}
return err;
}
srs_error_t SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket)
{
srs_error_t err = srs_success;

View file

@ -115,6 +115,9 @@ class SrsPacket
public:
SrsPacket();
virtual ~SrsPacket();
public:
// Covert packet to common message.
virtual srs_error_t to_msg(SrsCommonMessage* msg, int stream_id);
public:
// The subpacket can override this encode,
// For example, video and audio will directly set the payload withou memory copy,
@ -356,9 +359,6 @@ private:
virtual srs_error_t do_iovs_send(iovec* iovs, int size);
// The underlayer api for send and free packet.
virtual srs_error_t do_send_and_free_packet(SrsPacket* packet, int stream_id);
// Use simple algorithm to send the header and bytes.
// @remark, for do_send_and_free_packet to send.
virtual srs_error_t do_simple_send(SrsMessageHeader* mh, char* payload, int size);
// The imp for decode_message
virtual srs_error_t do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket);
// Recv bytes oriented RTMP message from protocol stack.

View file

@ -36,6 +36,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_service_http_conn.hpp>
#include <srs_kernel_buffer.hpp>
#define SRS_DEFAULT_RECV_BUFFER_SIZE 131072
using namespace std;
class MockPacket : public SrsPacket
@ -138,6 +140,18 @@ VOID TEST(ProtoStackTest, ManualFlush)
EXPECT_EQ(12+4, io.out_buffer.length());
}
if (true) {
MockBufferIO io;
SrsRtmpServer p(&io);
// Always response ACK message.
HELPER_EXPECT_SUCCESS(p.set_in_window_ack_size(1));
p.set_auto_response(true);
HELPER_EXPECT_SUCCESS(p.protocol->response_acknowledgement_message());
EXPECT_EQ(12+4, io.out_buffer.length());
}
if (true) {
MockBufferIO io;
SrsProtocol p(&io);
@ -1304,6 +1318,22 @@ VOID TEST(ProtoStackTest, HandshakeC0C1)
EXPECT_EQ(0x01020304, hs.proxy_real_ip);
}
// It's extended c0c1 prefixed with ip, which should be ok.
if (true) {
uint8_t buf[1537 + 7] = {
0xF3, 0x00, 0x04,
0x01, 0x02, 0x03, 0x04,
};
HELPER_ARRAY_INIT(buf+7, 1537, 0x00);
MockBufferIO io;
io.append(buf, sizeof(buf));
SrsRtmpServer r(&io);
HELPER_EXPECT_SUCCESS(r.hs_bytes->read_c0c1(&io));
EXPECT_EQ(0x01020304, r.proxy_real_ip());
}
// It seems a normal c0c1, but it's extended, so it fail.
if (true) {
uint8_t buf[1537] = {
@ -2396,7 +2426,14 @@ VOID TEST(ProtoStackTest, CoverAll)
MockBufferIO io;
SrsRtmpServer r(&io);
r.set_recv_timeout(100 * SRS_UTIME_MILLISECONDS);
EXPECT_EQ(100 * SRS_UTIME_MILLISECONDS, r.get_recv_timeout());
r.set_send_timeout(100 * SRS_UTIME_MILLISECONDS);
EXPECT_EQ(100 * SRS_UTIME_MILLISECONDS, r.get_send_timeout());
r.set_recv_buffer(SRS_DEFAULT_RECV_BUFFER_SIZE + 10);
EXPECT_EQ(SRS_DEFAULT_RECV_BUFFER_SIZE + 10, r.protocol->in_buffer->nb_buffer);
EXPECT_EQ(0, r.get_recv_bytes());
EXPECT_EQ(0, r.get_send_bytes());
@ -2407,6 +2444,107 @@ VOID TEST(ProtoStackTest, CoverAll)
HELPER_EXPECT_SUCCESS(r.send_and_free_packet(pkt, 0));
EXPECT_TRUE(r.get_send_bytes() > 0);
}
if (true) {
MockBufferIO io;
SrsRtmpServer r(&io);
HELPER_ASSERT_SUCCESS(r.set_peer_bandwidth(0, 0));
EXPECT_TRUE(r.get_send_bytes() > 0);
}
if (true) {
SrsBandwidthPacket p;
p.set_command("onSrsBandCheckStartPlayBytes");
EXPECT_TRUE(p.is_start_play());
p.command_name = "onSrsBandCheckStartPlayBytes";
EXPECT_TRUE(p.is_start_play());
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_start_play();
EXPECT_TRUE(p->is_start_play());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_starting_play();
EXPECT_TRUE(p->is_starting_play());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_playing();
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_stop_play();
EXPECT_TRUE(p->is_stop_play());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_stopped_play();
EXPECT_TRUE(p->is_stopped_play());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_start_publish();
EXPECT_TRUE(p->is_start_publish());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_starting_publish();
EXPECT_TRUE(p->is_starting_publish());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_publishing();
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_stop_publish();
EXPECT_TRUE(p->is_stop_publish());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_stopped_publish();
EXPECT_TRUE(p->is_stopped_publish());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_finish();
EXPECT_TRUE(p->is_finish());
srs_freep(p);
}
if (true) {
SrsBandwidthPacket* p = SrsBandwidthPacket::create_final();
EXPECT_TRUE(p->is_final());
srs_freep(p);
}
if (true) {
SrsAmf0Any* name = SrsAmf0Any::str("call");
SrsAmf0EcmaArray* arr = SrsAmf0Any::ecma_array();
arr->set("license", SrsAmf0Any::str("MIT"));
int nn = name->total_size() + arr->total_size();
char* b = new char[nn];
SrsAutoFreeA(char, b);
SrsBuffer buf(b, nn);
HELPER_ASSERT_SUCCESS(name->write(&buf));
HELPER_ASSERT_SUCCESS(arr->write(&buf));
SrsOnMetaDataPacket* p = new SrsOnMetaDataPacket();
SrsAutoFree(SrsOnMetaDataPacket, p);
buf.skip(-1 * buf.pos());
HELPER_ASSERT_SUCCESS(p->decode(&buf));
SrsAmf0Any* prop = p->metadata->get_property("license");
ASSERT_TRUE(prop && prop->is_string());
EXPECT_STREQ("MIT", prop->to_str().c_str());
}
}
VOID TEST(ProtoStackTest, ComplexToSimpleHandshake)
@ -2495,3 +2633,275 @@ VOID TEST(ProtoStackTest, ConnectAppWithArgs)
}
}
VOID TEST(ProtoStackTest, AgentMessageCodec)
{
srs_error_t err;
if (true) {
MockBufferIO io;
SrsRtmpClient p(&io);
if (true) {
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
srs_freep(msg);
}
}
if (true) {
MockBufferIO io;
SrsRtmpClient p(&io);
if (true) {
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_ASSERT_SUCCESS(p.recv_message(&msg));
SrsPacket* pkt = NULL;
HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt));
srs_freep(msg);
srs_freep(pkt);
}
}
if (true) {
MockBufferIO io;
SrsRtmpServer p(&io);
if (true) {
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
srs_freep(msg);
}
}
if (true) {
MockBufferIO io;
SrsRtmpServer p(&io);
if (true) {
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_EXPECT_SUCCESS(p.send_and_free_packet(res, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_ASSERT_SUCCESS(p.recv_message(&msg));
SrsPacket* pkt = NULL;
HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt));
srs_freep(msg);
srs_freep(pkt);
}
}
}
srs_error_t _mock_packet_to_shared_msg(SrsPacket* packet, int stream_id, SrsSharedPtrMessage* shared_msg)
{
srs_error_t err = srs_success;
SrsCommonMessage* msg = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, msg);
if ((err = packet->to_msg(msg, stream_id)) != srs_success) {
srs_freep(msg);
return err;
}
if ((err = shared_msg->create(msg)) != srs_success) {
return err;
}
return err;
}
VOID TEST(ProtoStackTest, CheckStreamID)
{
srs_error_t err;
if (true) {
MockBufferIO io;
SrsRtmpClient p(&io);
if (true) {
SrsSharedPtrMessage* shared_msgs[2];
SrsConnectAppPacket* res = new SrsConnectAppPacket();
SrsAutoFree(SrsConnectAppPacket, res);
if (true) {
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg));
shared_msgs[0] = shared_msg;
}
if (true) {
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 2, shared_msg));
shared_msgs[1] = shared_msg;
}
HELPER_EXPECT_SUCCESS(p.send_and_free_messages(shared_msgs, 2, 1));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
EXPECT_EQ(1, msg->header.stream_id);
srs_freep(msg);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
EXPECT_EQ(2, msg->header.stream_id);
srs_freep(msg);
}
}
}
VOID TEST(ProtoStackTest, AgentMessageTransform)
{
srs_error_t err;
if (true) {
MockBufferIO io;
SrsRtmpClient p(&io);
if (true) {
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg));
srs_freep(res);
HELPER_EXPECT_SUCCESS(p.send_and_free_message(shared_msg, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
srs_freep(msg);
}
}
if (true) {
MockBufferIO io;
SrsRtmpClient p(&io);
if (true) {
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg));
srs_freep(res);
HELPER_EXPECT_SUCCESS(p.send_and_free_messages(&shared_msg, 1, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
srs_freep(msg);
}
}
if (true) {
MockBufferIO io;
SrsRtmpServer p(&io);
if (true) {
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg));
srs_freep(res);
HELPER_EXPECT_SUCCESS(p.send_and_free_message(shared_msg, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
srs_freep(msg);
}
}
if (true) {
MockBufferIO io;
SrsRtmpServer p(&io);
if (true) {
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_ASSERT_SUCCESS(_mock_packet_to_shared_msg(res, 1, shared_msg));
srs_freep(res);
HELPER_EXPECT_SUCCESS(p.send_and_free_messages(&shared_msg, 1, 0));
io.in_buffer.append(&io.out_buffer);
}
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
srs_freep(msg);
}
}
}
class MockMRHandler : public IMergeReadHandler
{
public:
ssize_t nn;
MockMRHandler() : nn(0) {
}
virtual void on_read(ssize_t nread) {
nn += nread;
}
};
VOID TEST(ProtoStackTest, MergeReadHandler)
{
srs_error_t err;
MockBufferIO io;
SrsRtmpServer r(&io);
if (true) {
SrsConnectAppPacket* res = new SrsConnectAppPacket();
HELPER_EXPECT_SUCCESS(r.send_and_free_packet(res, 0));
io.in_buffer.append(&io.out_buffer);
}
MockMRHandler h;
r.set_merge_read(true, &h);
if (true) {
SrsCommonMessage* msg = NULL;
HELPER_EXPECT_SUCCESS(r.recv_message(&msg));
srs_freep(msg);
}
EXPECT_TRUE(h.nn > 0);
}