1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

for bug #194, refine code, remove the old duplicated code.

This commit is contained in:
winlin 2014-11-13 16:56:41 +08:00
parent 1f5c82ecc4
commit 8845bb7caf
12 changed files with 42 additions and 119 deletions

View file

@ -474,7 +474,7 @@ int SrsEdgeForwarder::cycle()
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_EDGE);
SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
while (pthread->can_loop()) {
if (send_error_code != ERROR_SUCCESS) {
@ -526,7 +526,7 @@ int SrsEdgeForwarder::cycle()
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
SrsMessage* msg = msgs.msgs[i];
srs_assert(msg);
msgs.msgs[i] = NULL;

View file

@ -381,7 +381,7 @@ int SrsForwarder::forward()
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_FORWARDER);
SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
SrsMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
// update sequence header
// TODO: FIXME: maybe need to zero the sequence header timestamp.
@ -442,7 +442,7 @@ int SrsForwarder::forward()
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
SrsMessage* msg = msgs.msgs[i];
srs_assert(msg);
msgs.msgs[i] = NULL;

View file

@ -517,7 +517,7 @@ int SrsRtmpConn::playing(SrsSource* source)
// initialize other components
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
SrsSharedPtrMessageArray msgs(SYS_CONSTS_MAX_PLAY_SEND_MSGS);
SrsMessageArray msgs(SYS_CONSTS_MAX_PLAY_SEND_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
@ -574,7 +574,7 @@ int SrsRtmpConn::playing(SrsSource* source)
// we start to collect the durations for each message.
if (user_specified_duration_to_stop) {
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
SrsMessage* msg = msgs.msgs[i];
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.

View file

@ -192,7 +192,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
return ret;
}
int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
int SrsMessageQueue::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
@ -207,7 +207,7 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in
pmsgs[i] = msgs[i];
}
SrsSharedPtrMessage* last = msgs[count - 1];
SrsMessage* last = msgs[count - 1];
av_start_time = last->header.timestamp;
if (count == (int)msgs.size()) {
@ -332,7 +332,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
return ret;
}
int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
{
srs_assert(max_count > 0);

View file

@ -132,7 +132,7 @@ public:
* @count the count in array, output param.
* @max_count the max count to dequeue, must be positive.
*/
virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
private:
/**
* remove a gop from the front.
@ -187,7 +187,7 @@ public:
* @count the count in array, output param.
* @max_count the max count to dequeue, must be positive.
*/
virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
/**
* when client send the pause message.
*/

View file

@ -25,11 +25,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_protocol_stack.hpp>
SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size)
SrsMessageArray::SrsMessageArray(int _size)
{
srs_assert(_size > 0);
msgs = new SrsSharedPtrMessage*[_size];
msgs = new SrsMessage*[_size];
size = _size;
// initialize
@ -38,11 +38,11 @@ SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size)
}
}
SrsSharedPtrMessageArray::~SrsSharedPtrMessageArray()
SrsMessageArray::~SrsMessageArray()
{
// cleanup
for (int i = 0; i < size; i++) {
SrsSharedPtrMessage* msg = msgs[i];
SrsMessage* msg = msgs[i];
srs_freep(msg);
}

View file

@ -30,7 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
class SrsSharedPtrMessage;
class SrsMessage;
/**
* the class to auto free the shared ptr message array.
@ -39,7 +39,7 @@ class SrsSharedPtrMessage;
* then send each message and set to NULL.
* @remark: when error, the message array will free the msg not sent out.
*/
class SrsSharedPtrMessageArray
class SrsMessageArray
{
public:
/**
@ -47,17 +47,17 @@ public:
* for instance, msg= msgs.msgs[i], msgs.msgs[i]=NULL, send(msg),
* where send(msg) will always send and free it.
*/
SrsSharedPtrMessage** msgs;
SrsMessage** msgs;
int size;
public:
/**
* create msg array, initialize array to NULL ptrs.
*/
SrsSharedPtrMessageArray(int _size);
SrsMessageArray(int _size);
/**
* free the msgs not sent out(not NULL).
*/
virtual ~SrsSharedPtrMessageArray();
virtual ~SrsMessageArray();
};
#endif

View file

@ -771,7 +771,7 @@ int SrsRtmpServer::send_and_free_message(SrsMessage* msg, int stream_id)
return protocol->send_and_free_message(msg, stream_id);
}
int SrsRtmpServer::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
int SrsRtmpServer::send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id)
{
return protocol->send_and_free_messages(msgs, nb_msgs, stream_id);
}

View file

@ -378,7 +378,7 @@ public:
* @remark performance issue, to support 6k+ 250kbps client,
* @see https://github.com/winlinvip/simple-rtmp-server/issues/194
*/
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
virtual int send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id);
/**
* send the RTMP packet and always free it.
* user must never free or use the packet after this method,

View file

@ -413,7 +413,7 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
// each chunk consumers atleast 2 iovs
srs_assert(nb_out_iovs >= 2);
warned_c0c3_caches = false;
warned_c0c3_cry = false;
}
SrsProtocol::~SrsProtocol()
@ -547,65 +547,7 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
return ret;
}
int SrsProtocol::do_send_message(SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
// ignore empty message.
if (!msg->payload || msg->size <= 0) {
srs_info("ignore empty message.");
return ret;
}
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (msg->header.perfer_cid < 2) {
srs_warn("change the chunk_id=%d to default=%d",
msg->header.perfer_cid, RTMP_CID_ProtocolControl);
msg->header.perfer_cid = RTMP_CID_ProtocolControl;
}
// p set to current write position,
// it's ok when payload is NULL and size is 0.
char* p = msg->payload;
char* pend = msg->payload + msg->size;
// always write the header event payload is empty.
while (p < pend) {
// always has header
int nbh = 0;
char* header = NULL;
generate_chunk_header(out_c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
srs_assert(nbh > 0);
// header iov
out_iov[0].iov_base = header;
out_iov[0].iov_len = nbh;
// payload iov
int payload_size = pend - p;
if (payload_size > out_chunk_size) {
payload_size = out_chunk_size;
}
out_iov[1].iov_base = p;
out_iov[1].iov_len = payload_size;
// send by writev
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
if ((ret = skt->writev(out_iov, 2, NULL)) != ERROR_SUCCESS) {
srs_error("send with writev failed. ret=%d", ret);
return ret;
}
// consume sendout bytes.
p += payload_size;
}
return ret;
}
int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs)
{
int ret = ERROR_SUCCESS;
@ -686,10 +628,10 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
// only warn once for a connection.
if (!warned_c0c3_caches) {
if (!warned_c0c3_cry) {
srs_warn("c0c3 cache header too small, recoment to %d",
SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);
warned_c0c3_caches = true;
warned_c0c3_cry = true;
}
// when c0c3 cache dry,
@ -977,21 +919,10 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream,
int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
{
// always not NULL msg.
srs_assert(msg);
// update the stream id in header.
msg->header.stream_id = stream_id;
// donot use the auto free to free the msg,
// for performance issue.
int ret = do_send_message(msg);
srs_freep(msg);
return ret;
return send_and_free_messages(&msg, 1, stream_id);
}
int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
int SrsProtocol::send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id)
{
// always not NULL msg.
srs_assert(msgs);
@ -1052,7 +983,7 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
// donot use the auto free to free the msg,
// for performance issue.
ret = do_send_message(msg);
ret = do_send_messages(&msg, 1);
if (ret == ERROR_SUCCESS) {
ret = on_send_packet(msg, packet);
}

View file

@ -211,25 +211,22 @@ private:
AckWindowSize in_ack_size;
// peer out
private:
/**
* output header cache.
* used for type0, 11bytes(or 15bytes with extended timestamp) header.
* or for type3, 1bytes(or 5bytes with extended timestamp) header.
*/
char out_c0c3_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
/**
* output iovec cache.
*/
iovec out_iov[2];
/**
* cache for multiple messages send
*/
iovec* out_iovs;
int nb_out_iovs;
// the c0c3 cache cannot be realloc.
/**
* output header cache.
* used for type0, 11bytes(or 15bytes with extended timestamp) header.
* or for type3, 1bytes(or 5bytes with extended timestamp) header.
* the c0c3 caches must use unit SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE bytes.
*
* @remark, the c0c3 cache cannot be realloc.
*/
char out_c0c3_caches[SRS_CONSTS_C0C3_HEADERS_MAX];
// whether warned user to increase the c0c3 header cache.
bool warned_c0c3_caches;
bool warned_c0c3_cry;
/**
* output chunk size, default to 128, set by config.
*/
@ -293,7 +290,7 @@ public:
* @param nb_msgs, the size of msgs to send out.
* @param stream_id, the stream id of packet to send over, 0 for control message.
*/
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
virtual int send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id);
/**
* send the RTMP packet and always free it.
* user must never free or use the packet after this method,
@ -362,16 +359,11 @@ public:
return ret;
}
private:
/**
* send out the message, donot free it,
* the caller must free the param msg.
*/
virtual int do_send_message(SrsMessage* msg);
/**
* send out the messages, donot free it,
* the caller must free the param msgs.
*/
virtual int do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
virtual int do_send_messages(SrsMessage** msgs, int nb_msgs);
/**
* generate the chunk header for msg.
* @param mh, the header of msg to send.

View file

@ -529,7 +529,7 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray)
EXPECT_EQ(0, msg.count());
if (true) {
SrsSharedPtrMessageArray arr(3);
SrsMessageArray arr(3);
arr.msgs[0] = msg.copy();
EXPECT_EQ(1, msg.count());
@ -543,7 +543,7 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray)
EXPECT_EQ(0, msg.count());
if (true) {
SrsSharedPtrMessageArray arr(3);
SrsMessageArray arr(3);
arr.msgs[0] = msg.copy();
EXPECT_EQ(1, msg.count());