diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index d91e27700..d0dca20ec 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -474,7 +474,7 @@ int SrsEdgeForwarder::cycle() SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_EDGE); - SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); + SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); while (pthread->can_loop()) { if (send_error_code != ERROR_SUCCESS) { @@ -526,7 +526,7 @@ int SrsEdgeForwarder::cycle() // @remark, becareful, all msgs must be free explicitly, // free by send_and_free_message or srs_freep. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; + SrsMessage* msg = msgs.msgs[i]; srs_assert(msg); msgs.msgs[i] = NULL; diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 78360d6de..831d065d5 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -381,7 +381,7 @@ int SrsForwarder::forward() SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_FORWARDER); - SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS); + SrsMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS); // update sequence header // TODO: FIXME: maybe need to zero the sequence header timestamp. @@ -442,7 +442,7 @@ int SrsForwarder::forward() // @remark, becareful, all msgs must be free explicitly, // free by send_and_free_message or srs_freep. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; + SrsMessage* msg = msgs.msgs[i]; srs_assert(msg); msgs.msgs[i] = NULL; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 186760361..8fcc3625c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -517,7 +517,7 @@ int SrsRtmpConn::playing(SrsSource* source) // initialize other components SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); - SrsSharedPtrMessageArray msgs(SYS_CONSTS_MAX_PLAY_SEND_MSGS); + SrsMessageArray msgs(SYS_CONSTS_MAX_PLAY_SEND_MSGS); bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; @@ -574,7 +574,7 @@ int SrsRtmpConn::playing(SrsSource* source) // we start to collect the durations for each message. if (user_specified_duration_to_stop) { for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; + SrsMessage* msg = msgs.msgs[i]; // foreach msg, collect the duration. // @remark: never use msg when sent it, for the protocol sdk will free it. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index f25e64e3e..4f578f075 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -192,7 +192,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) return ret; } -int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) +int SrsMessageQueue::dump_packets(int max_count, SrsMessage** pmsgs, int& count) { int ret = ERROR_SUCCESS; @@ -207,7 +207,7 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in pmsgs[i] = msgs[i]; } - SrsSharedPtrMessage* last = msgs[count - 1]; + SrsMessage* last = msgs[count - 1]; av_start_time = last->header.timestamp; if (count == (int)msgs.size()) { @@ -332,7 +332,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S return ret; } -int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) +int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count) { srs_assert(max_count > 0); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 1759ebf7e..2ac9b7f9e 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -132,7 +132,7 @@ public: * @count the count in array, output param. * @max_count the max count to dequeue, must be positive. */ - virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); + virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count); private: /** * remove a gop from the front. @@ -187,7 +187,7 @@ public: * @count the count in array, output param. * @max_count the max count to dequeue, must be positive. */ - virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); + virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count); /** * when client send the pause message. */ diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 5c2ebd320..b0d342283 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 16 +#define VERSION_REVISION 17 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/rtmp/srs_protocol_msg_array.cpp b/trunk/src/rtmp/srs_protocol_msg_array.cpp index c936fdb2c..54c96e93d 100644 --- a/trunk/src/rtmp/srs_protocol_msg_array.cpp +++ b/trunk/src/rtmp/srs_protocol_msg_array.cpp @@ -25,11 +25,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size) +SrsMessageArray::SrsMessageArray(int _size) { srs_assert(_size > 0); - msgs = new SrsSharedPtrMessage*[_size]; + msgs = new SrsMessage*[_size]; size = _size; // initialize @@ -38,11 +38,11 @@ SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size) } } -SrsSharedPtrMessageArray::~SrsSharedPtrMessageArray() +SrsMessageArray::~SrsMessageArray() { // cleanup for (int i = 0; i < size; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + SrsMessage* msg = msgs[i]; srs_freep(msg); } diff --git a/trunk/src/rtmp/srs_protocol_msg_array.hpp b/trunk/src/rtmp/srs_protocol_msg_array.hpp index 5d3bf894e..c3a570a73 100644 --- a/trunk/src/rtmp/srs_protocol_msg_array.hpp +++ b/trunk/src/rtmp/srs_protocol_msg_array.hpp @@ -30,7 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -class SrsSharedPtrMessage; +class SrsMessage; /** * the class to auto free the shared ptr message array. @@ -39,7 +39,7 @@ class SrsSharedPtrMessage; * then send each message and set to NULL. * @remark: when error, the message array will free the msg not sent out. */ -class SrsSharedPtrMessageArray +class SrsMessageArray { public: /** @@ -47,17 +47,17 @@ public: * for instance, msg= msgs.msgs[i], msgs.msgs[i]=NULL, send(msg), * where send(msg) will always send and free it. */ - SrsSharedPtrMessage** msgs; + SrsMessage** msgs; int size; public: /** * create msg array, initialize array to NULL ptrs. */ - SrsSharedPtrMessageArray(int _size); + SrsMessageArray(int _size); /** * free the msgs not sent out(not NULL). */ - virtual ~SrsSharedPtrMessageArray(); + virtual ~SrsMessageArray(); }; #endif diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 76a0b2cb1..27fae2a2f 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -771,7 +771,7 @@ int SrsRtmpServer::send_and_free_message(SrsMessage* msg, int stream_id) return protocol->send_and_free_message(msg, stream_id); } -int SrsRtmpServer::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id) +int SrsRtmpServer::send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id) { return protocol->send_and_free_messages(msgs, nb_msgs, stream_id); } diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 70fc8e8ca..ab7f43955 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -378,7 +378,7 @@ public: * @remark performance issue, to support 6k+ 250kbps client, * @see https://github.com/winlinvip/simple-rtmp-server/issues/194 */ - virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); + virtual int send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id); /** * send the RTMP packet and always free it. * user must never free or use the packet after this method, diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 8f2bb11a2..4394254fa 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -413,7 +413,7 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) // each chunk consumers atleast 2 iovs srs_assert(nb_out_iovs >= 2); - warned_c0c3_caches = false; + warned_c0c3_cry = false; } SrsProtocol::~SrsProtocol() @@ -547,65 +547,7 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket) return ret; } -int SrsProtocol::do_send_message(SrsMessage* msg) -{ - int ret = ERROR_SUCCESS; - - // ignore empty message. - if (!msg->payload || msg->size <= 0) { - srs_info("ignore empty message."); - return ret; - } - - // we donot use the complex basic header, - // ensure the basic header is 1bytes. - if (msg->header.perfer_cid < 2) { - srs_warn("change the chunk_id=%d to default=%d", - msg->header.perfer_cid, RTMP_CID_ProtocolControl); - msg->header.perfer_cid = RTMP_CID_ProtocolControl; - } - - // p set to current write position, - // it's ok when payload is NULL and size is 0. - char* p = msg->payload; - char* pend = msg->payload + msg->size; - - // always write the header event payload is empty. - while (p < pend) { - // always has header - int nbh = 0; - char* header = NULL; - generate_chunk_header(out_c0c3_cache, &msg->header, p == msg->payload, &nbh, &header); - srs_assert(nbh > 0); - - // header iov - out_iov[0].iov_base = header; - out_iov[0].iov_len = nbh; - - // payload iov - int payload_size = pend - p; - if (payload_size > out_chunk_size) { - payload_size = out_chunk_size; - } - out_iov[1].iov_base = p; - out_iov[1].iov_len = payload_size; - - // send by writev - // sendout header and payload by writev. - // decrease the sys invoke count to get higher performance. - if ((ret = skt->writev(out_iov, 2, NULL)) != ERROR_SUCCESS) { - srs_error("send with writev failed. ret=%d", ret); - return ret; - } - - // consume sendout bytes. - p += payload_size; - } - - return ret; -} - -int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) +int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs) { int ret = ERROR_SUCCESS; @@ -686,10 +628,10 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index; if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { // only warn once for a connection. - if (!warned_c0c3_caches) { + if (!warned_c0c3_cry) { srs_warn("c0c3 cache header too small, recoment to %d", SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE); - warned_c0c3_caches = true; + warned_c0c3_cry = true; } // when c0c3 cache dry, @@ -977,21 +919,10 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id) { - // always not NULL msg. - srs_assert(msg); - - // update the stream id in header. - msg->header.stream_id = stream_id; - - // donot use the auto free to free the msg, - // for performance issue. - int ret = do_send_message(msg); - srs_freep(msg); - - return ret; + return send_and_free_messages(&msg, 1, stream_id); } -int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id) +int SrsProtocol::send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id) { // always not NULL msg. srs_assert(msgs); @@ -1052,7 +983,7 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) // donot use the auto free to free the msg, // for performance issue. - ret = do_send_message(msg); + ret = do_send_messages(&msg, 1); if (ret == ERROR_SUCCESS) { ret = on_send_packet(msg, packet); } diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index 2f4914d9c..a3f05b327 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -211,25 +211,22 @@ private: AckWindowSize in_ack_size; // peer out private: - /** - * output header cache. - * used for type0, 11bytes(or 15bytes with extended timestamp) header. - * or for type3, 1bytes(or 5bytes with extended timestamp) header. - */ - char out_c0c3_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; - /** - * output iovec cache. - */ - iovec out_iov[2]; /** * cache for multiple messages send */ iovec* out_iovs; int nb_out_iovs; - // the c0c3 cache cannot be realloc. + /** + * output header cache. + * used for type0, 11bytes(or 15bytes with extended timestamp) header. + * or for type3, 1bytes(or 5bytes with extended timestamp) header. + * the c0c3 caches must use unit SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE bytes. + * + * @remark, the c0c3 cache cannot be realloc. + */ char out_c0c3_caches[SRS_CONSTS_C0C3_HEADERS_MAX]; // whether warned user to increase the c0c3 header cache. - bool warned_c0c3_caches; + bool warned_c0c3_cry; /** * output chunk size, default to 128, set by config. */ @@ -293,7 +290,7 @@ public: * @param nb_msgs, the size of msgs to send out. * @param stream_id, the stream id of packet to send over, 0 for control message. */ - virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); + virtual int send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id); /** * send the RTMP packet and always free it. * user must never free or use the packet after this method, @@ -362,16 +359,11 @@ public: return ret; } private: - /** - * send out the message, donot free it, - * the caller must free the param msg. - */ - virtual int do_send_message(SrsMessage* msg); /** * send out the messages, donot free it, * the caller must free the param msgs. */ - virtual int do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs); + virtual int do_send_messages(SrsMessage** msgs, int nb_msgs); /** * generate the chunk header for msg. * @param mh, the header of msg to send. diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index 0c1bd47c8..c6aabc7a6 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -529,7 +529,7 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) EXPECT_EQ(0, msg.count()); if (true) { - SrsSharedPtrMessageArray arr(3); + SrsMessageArray arr(3); arr.msgs[0] = msg.copy(); EXPECT_EQ(1, msg.count()); @@ -543,7 +543,7 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) EXPECT_EQ(0, msg.count()); if (true) { - SrsSharedPtrMessageArray arr(3); + SrsMessageArray arr(3); arr.msgs[0] = msg.copy(); EXPECT_EQ(1, msg.count());