mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
fix #194, writev multiple msgs, support 6k+ 250kbps clients. 2.0.15.
This commit is contained in:
parent
7cf855f242
commit
cc6aca9ad5
8 changed files with 251 additions and 29 deletions
|
@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
#include <srs_core_autofree.hpp>
|
||||
#include <srs_kernel_utility.hpp>
|
||||
|
||||
#include <stdlib.h>
|
||||
using namespace std;
|
||||
|
||||
// when got a messae header, there must be some data,
|
||||
|
@ -404,7 +405,15 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
|
|||
in_buffer = new SrsBuffer();
|
||||
skt = io;
|
||||
|
||||
in_chunk_size = out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
|
||||
in_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
|
||||
out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
|
||||
|
||||
nb_out_iovs = SRS_CONSTS_IOVS_MAX;
|
||||
out_iovs = (iovec*)malloc(sizeof(iovec) * nb_out_iovs);
|
||||
// each chunk consumers atleast 2 iovs
|
||||
srs_assert(nb_out_iovs >= 2);
|
||||
|
||||
warned_c0c3_caches = false;
|
||||
}
|
||||
|
||||
SrsProtocol::~SrsProtocol()
|
||||
|
@ -421,6 +430,12 @@ SrsProtocol::~SrsProtocol()
|
|||
}
|
||||
|
||||
srs_freep(in_buffer);
|
||||
|
||||
// alloc by malloc, use free directly.
|
||||
if (out_iovs) {
|
||||
free(out_iovs);
|
||||
out_iovs = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void SrsProtocol::set_recv_timeout(int64_t timeout_us)
|
||||
|
@ -560,7 +575,7 @@ int SrsProtocol::do_send_message(SrsMessage* msg)
|
|||
// always has header
|
||||
int nbh = 0;
|
||||
char* header = NULL;
|
||||
generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header);
|
||||
generate_chunk_header(out_c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
|
||||
srs_assert(nbh > 0);
|
||||
|
||||
// header iov
|
||||
|
@ -590,10 +605,130 @@ int SrsProtocol::do_send_message(SrsMessage* msg)
|
|||
return ret;
|
||||
}
|
||||
|
||||
void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
|
||||
int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
|
||||
{
|
||||
char* cache = out_c0c3_cache;
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
||||
// TODO: FIXME: use cache system instead.
|
||||
int iov_index = 0;
|
||||
iovec* iov = out_iovs + iov_index;
|
||||
|
||||
int c0c3_cache_index = 0;
|
||||
char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
|
||||
|
||||
// try to send use the c0c3 header cache,
|
||||
// if cache is consumed, try another loop.
|
||||
for (int i = 0; i < nb_msgs; i++) {
|
||||
SrsMessage* msg = msgs[i];
|
||||
|
||||
// ignore empty message.
|
||||
if (!msg->payload || msg->size <= 0) {
|
||||
srs_info("ignore empty message.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// we donot use the complex basic header,
|
||||
// ensure the basic header is 1bytes.
|
||||
if (msg->header.perfer_cid < 2) {
|
||||
srs_warn("change the chunk_id=%d to default=%d",
|
||||
msg->header.perfer_cid, RTMP_CID_ProtocolControl);
|
||||
msg->header.perfer_cid = RTMP_CID_ProtocolControl;
|
||||
}
|
||||
|
||||
// p set to current write position,
|
||||
// it's ok when payload is NULL and size is 0.
|
||||
char* p = msg->payload;
|
||||
char* pend = msg->payload + msg->size;
|
||||
|
||||
// always write the header event payload is empty.
|
||||
while (p < pend) {
|
||||
// always has header
|
||||
int nbh = 0;
|
||||
char* header = NULL;
|
||||
generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
|
||||
srs_assert(nbh > 0);
|
||||
|
||||
// header iov
|
||||
iov[0].iov_base = header;
|
||||
iov[0].iov_len = nbh;
|
||||
|
||||
// payload iov
|
||||
int payload_size = pend - p;
|
||||
if (payload_size > out_chunk_size) {
|
||||
payload_size = out_chunk_size;
|
||||
}
|
||||
iov[1].iov_base = p;
|
||||
iov[1].iov_len = payload_size;
|
||||
|
||||
// consume sendout bytes.
|
||||
p += payload_size;
|
||||
|
||||
// realloc the iovs if exceed,
|
||||
// for we donot know how many messges maybe to send entirely,
|
||||
// we just alloc the iovs, it's ok.
|
||||
if (iov_index >= nb_out_iovs - 2) {
|
||||
nb_out_iovs += SRS_CONSTS_IOVS_MAX;
|
||||
int realloc_size = sizeof(iovec) * nb_out_iovs;
|
||||
out_iovs = (iovec*)realloc(out_iovs, realloc_size);
|
||||
}
|
||||
|
||||
// to next pair of iovs
|
||||
iov_index += 2;
|
||||
iov = out_iovs + iov_index;
|
||||
|
||||
// to next c0c3 header cache
|
||||
c0c3_cache_index += nbh;
|
||||
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
|
||||
|
||||
// the cache header should never be realloc again,
|
||||
// for the ptr is set to iovs, so we just warn user to set larger
|
||||
// and use another loop to send again.
|
||||
int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
|
||||
if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
|
||||
// only warn once for a connection.
|
||||
if (!warned_c0c3_caches) {
|
||||
srs_warn("c0c3 cache header too small, recoment to %d",
|
||||
SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);
|
||||
warned_c0c3_caches = true;
|
||||
}
|
||||
|
||||
// when c0c3 cache dry,
|
||||
// sendout all messages and reset the cache, then send again.
|
||||
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
|
||||
srs_error("send with writev failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// reset caches, while these cache ensure
|
||||
// atleast we can sendout a chunk.
|
||||
iov_index = 0;
|
||||
iov = out_iovs + iov_index;
|
||||
|
||||
c0c3_cache_index = 0;
|
||||
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// maybe the iovs already sendout when c0c3 cache dry,
|
||||
// so just ignore when no iovs to send.
|
||||
if (iov_index <= 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
// send by writev
|
||||
// sendout header and payload by writev.
|
||||
// decrease the sys invoke count to get higher performance.
|
||||
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
|
||||
srs_error("send with writev failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
|
||||
{
|
||||
// to directly set the field.
|
||||
char* pp = NULL;
|
||||
|
||||
|
@ -856,6 +991,34 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
|
|||
return ret;
|
||||
}
|
||||
|
||||
int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
|
||||
{
|
||||
// always not NULL msg.
|
||||
srs_assert(msgs);
|
||||
srs_assert(nb_msgs > 0);
|
||||
|
||||
// update the stream id in header.
|
||||
for (int i = 0; i < nb_msgs; i++) {
|
||||
SrsMessage* msg = msgs[i];
|
||||
// we assume that the stream_id in a group must be the same.
|
||||
if (msg->header.stream_id == stream_id) {
|
||||
break;
|
||||
}
|
||||
msg->header.stream_id = stream_id;
|
||||
}
|
||||
|
||||
// donot use the auto free to free the msg,
|
||||
// for performance issue.
|
||||
int ret = do_send_messages(msgs, nb_msgs);
|
||||
|
||||
for (int i = 0; i < nb_msgs; i++) {
|
||||
SrsMessage* msg = msgs[i];
|
||||
srs_freep(msg);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue