1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

refine code for bug #194, use send messages for all msg array.

This commit is contained in:
winlin 2014-11-14 11:24:49 +08:00
parent f11272e3ce
commit 47ed9e33dd
8 changed files with 38 additions and 52 deletions

View file

@ -499,7 +499,7 @@ int SrsEdgeForwarder::cycle()
// forward all messages. // forward all messages.
int count = 0; int count = 0;
if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to push to origin failed. ret=%d", ret); srs_error("get message to push to origin failed. ret=%d", ret);
return ret; return ret;
} }
@ -523,18 +523,9 @@ int SrsEdgeForwarder::cycle()
} }
// all msgs to forward to origin. // all msgs to forward to origin.
// @remark, becareful, all msgs must be free explicitly, if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
// free by send_and_free_message or srs_freep. srs_error("edge publish push message to server failed. ret=%d", ret);
for (int i = 0; i < count; i++) { return ret;
SrsMessage* msg = msgs.msgs[i];
srs_assert(msg);
msgs.msgs[i] = NULL;
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("edge publish push message to server failed. ret=%d", ret);
return ret;
}
} }
} }

View file

@ -417,7 +417,7 @@ int SrsForwarder::forward()
// forward all messages. // forward all messages.
int count = 0; int count = 0;
if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward failed. ret=%d", ret); srs_error("get message to forward failed. ret=%d", ret);
return ret; return ret;
} }
@ -439,18 +439,9 @@ int SrsForwarder::forward()
} }
// all msgs to forward. // all msgs to forward.
// @remark, becareful, all msgs must be free explicitly, if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
// free by send_and_free_message or srs_freep. srs_error("forwarder messages to server failed. ret=%d", ret);
for (int i = 0; i < count; i++) { return ret;
SrsMessage* msg = msgs.msgs[i];
srs_assert(msg);
msgs.msgs[i] = NULL;
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("forwarder send message to server failed. ret=%d", ret);
return ret;
}
} }
} }

View file

@ -559,7 +559,7 @@ int SrsRtmpConn::playing(SrsSource* source)
// get messages from consumer. // get messages from consumer.
int count = 0; int count = 0;
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { if ((ret = consumer->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret); srs_error("get messages from consumer failed. ret=%d", ret);
return ret; return ret;
} }
@ -596,16 +596,10 @@ int SrsRtmpConn::playing(SrsSource* source)
// free by send_and_free_message or srs_freep. // free by send_and_free_message or srs_freep.
if (count > 0) { if (count > 0) {
// no need to assert msg, for the rtmp will assert it. // no need to assert msg, for the rtmp will assert it.
ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id); if ((ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) {
} srs_error("send messages to client failed. ret=%d", ret);
for (int i = 0; i < count; i++) { return ret;
// the send_message will free the msg, }
// so set the msgs[i] to NULL.
msgs.msgs[i] = NULL;
}
if (ret != ERROR_SUCCESS) {
srs_error("send messages to client failed. ret=%d", ret);
return ret;
} }
// if duration specified, and exceed it, stop play live. // if duration specified, and exceed it, stop play live.

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR 2 #define VERSION_MAJOR 2
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 17 #define VERSION_REVISION 18
// server info. // server info.
#define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server" #define RTMP_SIG_SRS_ROLE "origin/edge server"

View file

@ -25,27 +25,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_protocol_stack.hpp> #include <srs_protocol_stack.hpp>
SrsMessageArray::SrsMessageArray(int _size) SrsMessageArray::SrsMessageArray(int max_msgs)
{ {
srs_assert(_size > 0); srs_assert(max_msgs > 0);
msgs = new SrsMessage*[_size]; msgs = new SrsMessage*[max_msgs];
size = _size; max = max_msgs;
// initialize // initialize
for (int i = 0; i < _size; i++) { for (int i = 0; i < max_msgs; i++) {
msgs[i] = NULL; msgs[i] = NULL;
} }
} }
SrsMessageArray::~SrsMessageArray() SrsMessageArray::~SrsMessageArray()
{ {
// cleanup
for (int i = 0; i < size; i++) {
SrsMessage* msg = msgs[i];
srs_freep(msg);
}
srs_freep(msgs); srs_freep(msgs);
} }

View file

@ -37,7 +37,9 @@ class SrsMessage;
* when need to get some messages, for instance, from Consumer queue, * when need to get some messages, for instance, from Consumer queue,
* create a message array, whose msgs can used to accept the msgs, * create a message array, whose msgs can used to accept the msgs,
* then send each message and set to NULL. * then send each message and set to NULL.
* @remark: when error, the message array will free the msg not sent out. *
* @remark: user must free all msgs in array, for the SRS2.0 protocol stack
* provides an api to send messages, @see send_and_free_messages
*/ */
class SrsMessageArray class SrsMessageArray
{ {
@ -48,12 +50,12 @@ public:
* where send(msg) will always send and free it. * where send(msg) will always send and free it.
*/ */
SrsMessage** msgs; SrsMessage** msgs;
int size; int max;
public: public:
/** /**
* create msg array, initialize array to NULL ptrs. * create msg array, initialize array to NULL ptrs.
*/ */
SrsMessageArray(int _size); SrsMessageArray(int max_msgs);
/** /**
* free the msgs not sent out(not NULL). * free the msgs not sent out(not NULL).
*/ */

View file

@ -371,6 +371,11 @@ int SrsRtmpClient::send_and_free_message(SrsMessage* msg, int stream_id)
return protocol->send_and_free_message(msg, stream_id); return protocol->send_and_free_message(msg, stream_id);
} }
int SrsRtmpClient::send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id)
{
return protocol->send_and_free_messages(msgs, nb_msgs, stream_id);
}
int SrsRtmpClient::send_and_free_packet(SrsPacket* packet, int stream_id) int SrsRtmpClient::send_and_free_packet(SrsPacket* packet, int stream_id)
{ {
return protocol->send_and_free_packet(packet, stream_id); return protocol->send_and_free_packet(packet, stream_id);

View file

@ -222,6 +222,15 @@ public:
*/ */
virtual int send_and_free_message(SrsMessage* msg, int stream_id); virtual int send_and_free_message(SrsMessage* msg, int stream_id);
/** /**
* send the RTMP message and always free it.
* user must never free or use the msg after this method,
* for it will always free the msg.
* @param msgs, the msgs to send out, never be NULL.
* @param nb_msgs, the size of msgs to send out.
* @param stream_id, the stream id of packet to send over, 0 for control message.
*/
virtual int send_and_free_messages(SrsMessage** msgs, int nb_msgs, int stream_id);
/**
* send the RTMP packet and always free it. * send the RTMP packet and always free it.
* user must never free or use the packet after this method, * user must never free or use the packet after this method,
* for it will always free the packet. * for it will always free the packet.