1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

for bug #199, refine the api of send message.

This commit is contained in:
winlin 2014-11-12 17:59:32 +08:00
parent 55580ca900
commit ab93506b01
2 changed files with 149 additions and 115 deletions

View file

@ -532,78 +532,121 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
return ret; return ret;
} }
int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet) int SrsProtocol::do_send_message(SrsMessage* msg)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// always not NULL msg. // ignore empty message.
srs_assert(msg); if (!msg->payload || msg->size <= 0) {
srs_info("ignore empty message.");
return ret;
}
// we donot use the complex basic header, // we donot use the complex basic header,
// ensure the basic header is 1bytes. // ensure the basic header is 1bytes.
if (msg->header.perfer_cid < 2) { if (msg->header.perfer_cid < 2) {
srs_warn("change the chunk_id=%d to default=%d", msg->header.perfer_cid, RTMP_CID_ProtocolControl); srs_warn("change the chunk_id=%d to default=%d",
msg->header.perfer_cid, RTMP_CID_ProtocolControl);
msg->header.perfer_cid = RTMP_CID_ProtocolControl; msg->header.perfer_cid = RTMP_CID_ProtocolControl;
} }
// p set to current write position, // p set to current write position,
// it's ok when payload is NULL and size is 0. // it's ok when payload is NULL and size is 0.
char* p = msg->payload; char* p = msg->payload;
char* pend = msg->payload + msg->size;
// always write the header event payload is empty.
while (p < pend) {
// always has header
int nbh = 0;
char* header = NULL;
generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header);
srs_assert(nbh > 0);
// header iov
iov[0].iov_base = header;
iov[0].iov_len = nbh;
// payload iov
int payload_size = pend - p;
if (payload_size > out_chunk_size) {
payload_size = out_chunk_size;
}
iov[1].iov_base = p;
iov[1].iov_len = payload_size;
// send by writev
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
if ((ret = skt->writev(iov, 2, NULL)) != ERROR_SUCCESS) {
srs_error("send with writev failed. ret=%d", ret);
return ret;
}
// consume sendout bytes.
p += payload_size;
}
return ret;
}
void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
{
char* cache = out_c0_cache;
// to directly set the field. // to directly set the field.
char* pp = NULL; char* pp = NULL;
// always write the header event payload is empty.
do {
// generate the header. // generate the header.
char* pheader = out_header_cache; char* p = cache;
if (p == msg->payload) { if (c0) {
// write new chunk stream header, fmt is 0 // write new chunk stream header, fmt is 0
*pheader++ = 0x00 | (msg->header.perfer_cid & 0x3F); *p++ = 0x00 | (mh->perfer_cid & 0x3F);
// chunk message header, 11 bytes // chunk message header, 11 bytes
// timestamp, 3bytes, big-endian // timestamp, 3bytes, big-endian
u_int32_t timestamp = (u_int32_t)msg->header.timestamp; u_int32_t timestamp = (u_int32_t)mh->timestamp;
if (timestamp < RTMP_EXTENDED_TIMESTAMP) { if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp; pp = (char*)&timestamp;
*pheader++ = pp[2]; *p++ = pp[2];
*pheader++ = pp[1]; *p++ = pp[1];
*pheader++ = pp[0]; *p++ = pp[0];
} else { } else {
*pheader++ = 0xFF; *p++ = 0xFF;
*pheader++ = 0xFF; *p++ = 0xFF;
*pheader++ = 0xFF; *p++ = 0xFF;
} }
// message_length, 3bytes, big-endian // message_length, 3bytes, big-endian
pp = (char*)&msg->header.payload_length; pp = (char*)&mh->payload_length;
*pheader++ = pp[2]; *p++ = pp[2];
*pheader++ = pp[1]; *p++ = pp[1];
*pheader++ = pp[0]; *p++ = pp[0];
// message_type, 1bytes // message_type, 1bytes
*pheader++ = msg->header.message_type; *p++ = mh->message_type;
// message_length, 3bytes, little-endian // message_length, 3bytes, little-endian
pp = (char*)&msg->header.stream_id; pp = (char*)&mh->stream_id;
*pheader++ = pp[0]; *p++ = pp[0];
*pheader++ = pp[1]; *p++ = pp[1];
*pheader++ = pp[2]; *p++ = pp[2];
*pheader++ = pp[3]; *p++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian // chunk extended timestamp header, 0 or 4 bytes, big-endian
if(timestamp >= RTMP_EXTENDED_TIMESTAMP) { if(timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp; pp = (char*)&timestamp;
*pheader++ = pp[3]; *p++ = pp[3];
*pheader++ = pp[2]; *p++ = pp[2];
*pheader++ = pp[1]; *p++ = pp[1];
*pheader++ = pp[0]; *p++ = pp[0];
} }
} else { } else {
// write no message header chunk stream, fmt is 3 // write no message header chunk stream, fmt is 3
// @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
// SRS will rollback to 1B chunk header. // SRS will rollback to 1B chunk header.
*pheader++ = 0xC0 | (msg->header.perfer_cid & 0x3F); *p++ = 0xC0 | (mh->perfer_cid & 0x3F);
// chunk extended timestamp header, 0 or 4 bytes, big-endian // chunk extended timestamp header, 0 or 4 bytes, big-endian
// 6.1.3. Extended Timestamp // 6.1.3. Extended Timestamp
@ -618,51 +661,20 @@ int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet)
// must send the extended-timestamp to flash-player. // must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message // @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699 // @see: http://blog.csdn.net/win_lin/article/details/13363699
u_int32_t timestamp = (u_int32_t)msg->header.timestamp; // TODO: FIXME: extract to outer.
u_int32_t timestamp = (u_int32_t)mh->timestamp;
if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp; pp = (char*)&timestamp;
*pheader++ = pp[3]; *p++ = pp[3];
*pheader++ = pp[2]; *p++ = pp[2];
*pheader++ = pp[1]; *p++ = pp[1];
*pheader++ = pp[0]; *p++ = pp[0];
} }
} }
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
int payload_size = msg->size - (p - msg->payload);
payload_size = srs_min(payload_size, out_chunk_size);
// always has header // always has header
int header_size = pheader - out_header_cache; *pnbh = p - cache;
srs_assert(header_size > 0); *ph = cache;
// send by writev
iovec iov[2];
iov[0].iov_base = out_header_cache;
iov[0].iov_len = header_size;
iov[1].iov_base = p;
iov[1].iov_len = payload_size;
ssize_t nwrite;
if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {
srs_error("send with writev failed. ret=%d", ret);
return ret;
}
// consume sendout bytes when not empty packet.
if (msg->payload && msg->size > 0) {
p += payload_size;
}
} while (p < msg->payload + msg->size);
// only process the callback event when with packet
if (packet && (ret = on_send_packet(msg, packet)) != ERROR_SUCCESS) {
srs_error("hook the send message failed. ret=%d", ret);
return ret;
}
return ret;
} }
int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket)
@ -834,14 +846,17 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream,
int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id) int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
{ {
if (msg) { // always not NULL msg.
srs_assert(msg);
// update the stream id in header.
msg->header.stream_id = stream_id; msg->header.stream_id = stream_id;
}
// donot use the auto free to free the msg, // donot use the auto free to free the msg,
// for performance issue. // for performance issue.
int ret = do_send_message(msg, NULL); int ret = do_send_message(msg);
srs_freep(msg); srs_freep(msg);
return ret; return ret;
} }
@ -878,7 +893,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
// donot use the auto free to free the msg, // donot use the auto free to free the msg,
// for performance issue. // for performance issue.
ret = do_send_message(msg, packet); ret = do_send_message(msg);
if (ret == ERROR_SUCCESS) {
ret = on_send_packet(msg, packet);
}
srs_freep(msg); srs_freep(msg);
return ret; return ret;
@ -1535,8 +1553,10 @@ int SrsProtocol::on_send_packet(SrsMessage* msg, SrsPacket* packet)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// should never be raw bytes oriented RTMP message. // ignore raw bytes oriented RTMP message.
srs_assert(packet); if (packet == NULL) {
return ret;
}
switch (msg->header.message_type) { switch (msg->header.message_type) {
case RTMP_MSG_SetChunkSize: { case RTMP_MSG_SetChunkSize: {

View file

@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <map> #include <map>
#include <string> #include <string>
#include <sys/uio.h>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
@ -214,7 +215,11 @@ private:
* used for type0, 11bytes(or 15bytes with extended timestamp) header. * used for type0, 11bytes(or 15bytes with extended timestamp) header.
* or for type3, 1bytes(or 5bytes with extended timestamp) header. * or for type3, 1bytes(or 5bytes with extended timestamp) header.
*/ */
char out_header_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; char out_c0_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
/**
* output iovec cache.
*/
iovec iov[2];
/** /**
* output chunk size, default to 128, set by config. * output chunk size, default to 128, set by config.
*/ */
@ -339,10 +344,19 @@ public:
} }
private: private:
/** /**
* send out the message, donot free it, the caller must free the param msg. * send out the message, donot free it,
* @param packet the packet of message, NULL for raw message. * the caller must free the param msg.
*/ */
virtual int do_send_message(SrsMessage* msg, SrsPacket* packet); virtual int do_send_message(SrsMessage* msg);
/**
* generate the chunk header for msg.
* @param mh, the header of msg to send.
* @param c0, whether the first chunk, the c0 chunk.
* @param pnbh, output the size of header.
* @param ph, output the header cache.
* user should never free it, it's cached header.
*/
virtual void generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph);
/** /**
* imp for decode_message * imp for decode_message
*/ */