diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 3e5424ba4..b27b3213c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -598,6 +598,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // collect elapse for pithy print. pithy_print.elapse(); +#ifdef SRS_PERF_QUEUE_COND_WAIT + // wait for message to incoming. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 + consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); +#endif + // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; @@ -605,12 +611,16 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) srs_error("get messages from consumer failed. ret=%d", ret); return ret; } - - // no messages, sleep for a while. + +#ifdef SRS_PERF_QUEUE_COND_WAIT + // we use wait to get messages, so the count must be positive. + srs_assert(count > 0); +#else if (count <= 0) { st_usleep(mw_sleep * 1000); } - srs_info("got %d msgs, mw=%d", count, mw_sleep); +#endif + srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep); // reportable if (pithy_print.can_print()) { @@ -995,6 +1005,13 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) return; } + // get the sock buffer size. + int fd = st_netfd_fileno(stfd); + int onb_sbuf = 0; + socklen_t sock_buf_size = sizeof(int); + getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size); + +#ifdef SRS_PERF_MW_SO_SNDBUF // the bytes: // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, // 128KB=131072, 256KB=262144, 512KB=524288 @@ -1007,11 +1024,6 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) // 2000*5000/8=1250000B(about 1220KB). int kbps = 5000; int socket_buffer_size = sleep_ms * kbps / 8; - - int fd = st_netfd_fileno(stfd); - int onb_sbuf = 0; - socklen_t sock_buf_size = sizeof(int); - getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size); // socket send buffer, system will double it. int nb_sbuf = socket_buffer_size / 2; @@ -1022,9 +1034,13 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) } getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size); - srs_trace("mw change sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d", + srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d", mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size, onb_sbuf, nb_sbuf); +#else + srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d", + mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf); +#endif mw_sleep = sleep_ms; } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 21ea77f31..523dafc5a 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -182,7 +182,7 @@ void SrsMessageQueue::set_queue_size(double queue_size) queue_size_ms = (int)(queue_size * 1000); } -int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -197,11 +197,6 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) msgs.push_back(msg); while (av_end_time - av_start_time > queue_size_ms) { - // notice the caller queue already overflow and shrinked. - if (is_overflow) { - *is_overflow = true; - } - shrink(); } @@ -211,7 +206,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { int ret = ERROR_SUCCESS; - + int nb_msgs = (int)msgs.size(); if (nb_msgs <= 0) { return ret; @@ -308,6 +303,13 @@ SrsConsumer::SrsConsumer(SrsSource* _source) jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); should_update_source_id = false; + +#ifdef SRS_PERF_QUEUE_COND_WAIT + mw_wait = st_cond_new(); + mw_min_msgs = 0; + mw_duration = 0; + mw_waiting = false; +#endif } SrsConsumer::~SrsConsumer() @@ -315,6 +317,10 @@ SrsConsumer::~SrsConsumer() source->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); + +#ifdef SRS_PERF_QUEUE_COND_WAIT + st_cond_destroy(mw_wait); +#endif } void SrsConsumer::set_queue_size(double queue_size) @@ -344,11 +350,25 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, return ret; } } - - if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { + + if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { return ret; } + #ifdef SRS_PERF_QUEUE_COND_WAIT + // fire the mw when msgs is enough. + if (mw_waiting) { + int duration_ms = queue->duration(); + bool match_min_msgs = queue->size() > mw_min_msgs; + + // when duration ok, signal to flush. + if (match_min_msgs && duration_ms > mw_duration) { + st_cond_signal(mw_wait); + mw_waiting = false; + } + } + #endif + return ret; } @@ -376,6 +396,27 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) return ret; } +#ifdef SRS_PERF_QUEUE_COND_WAIT +void SrsConsumer::wait(int nb_msgs, int duration) +{ + mw_min_msgs = nb_msgs; + mw_duration = duration; + + int duration_ms = queue->duration(); + bool match_min_msgs = queue->size() > mw_min_msgs; + + // when duration ok, signal to flush. + if (match_min_msgs && duration_ms > mw_duration) { + return; + } + + // the enqueue will notify this cond. + mw_waiting = true; + // wait for msgs to incoming. + st_cond_wait(mw_wait); +} +#endif + int SrsConsumer::on_play_client_pause(bool is_pause) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 69da17a7d..d1ec5bae4 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -134,12 +134,11 @@ public: /** * enqueue the message, the timestamp always monotonically. * @param msg, the msg to enqueue, user never free it whatever the return code. - * @param is_overflow, whether overflow and shrinked. NULL to ignore. */ - virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); + virtual int enqueue(SrsSharedPtrMessage* msg); /** * get packets in consumer queue. - * @pmsgs SrsCommonMessages*[], used to store the msgs, user must alloc it. + * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. * @count the count in array, output param. * @max_count the max count to dequeue, must be positive. */ @@ -165,6 +164,14 @@ private: bool paused; // when source id changed, notice all consumers bool should_update_source_id; +#ifdef SRS_PERF_QUEUE_COND_WAIT + // the cond wait for mw. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 + st_cond_t mw_wait; + bool mw_waiting; + int mw_min_msgs; + int mw_duration; +#endif public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); @@ -200,6 +207,14 @@ public: * @max_count the max count to dequeue, must be positive. */ virtual int dump_packets(SrsMessageArray* msgs, int& count); +#ifdef SRS_PERF_QUEUE_COND_WAIT + /** + * wait for messages incomming, atleast nb_msgs and in duration. + * @param nb_msgs the messages count to wait. + * @param duration the messgae duration to wait. + */ + virtual void wait(int nb_msgs, int duration); +#endif /** * when client send the pause message. */ diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index cf08a9d72..a3dc15ea2 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -103,6 +103,22 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * @remark, recomment to 128. */ #define SRS_PERF_MW_MSGS 128 +/** +* whether set the socket send buffer size. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 +*/ +#undef SRS_PERF_MW_SO_SNDBUF +/** +* whether set the socket recv buffer size. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 +*/ +#undef SRS_PERF_MW_SO_RCVBUF +/** +* whether use cond wait to send messages. +* @remark this improve performance for large connectios. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 +*/ +#undef SRS_PERF_QUEUE_COND_WAIT /** * how many chunk stream to cache, [0, N]. diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 09b45e30e..6aa32eba5 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -37,7 +37,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsProtocol; class ISrsProtocolReaderWriter; -class ISrsCommonMessage; class SrsCommonMessage; class SrsCreateStreamPacket; class SrsFMLEStartPacket; diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 5589ecd04..128253474 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -755,10 +755,17 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) // always write the header event payload is empty. while (p < pend) { - // header use iov[0]. - generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, iov); + // always has header + int nbh = 0; + char* header = NULL; + generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header); + srs_assert(nbh > 0); - // payload use iov[1]. + // header iov + iov[0].iov_base = header; + iov[0].iov_len = nbh; + + // payload iov int payload_size = pend - p; if (payload_size > out_chunk_size) { payload_size = out_chunk_size; @@ -781,14 +788,14 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) int realloc_size = sizeof(iovec) * nb_out_iovs; out_iovs = (iovec*)realloc(out_iovs, realloc_size); } - - // to next c0c3 header cache - c0c3_cache_index += iov[0].iov_len; - c0c3_cache = out_c0c3_caches + c0c3_cache_index; // to next pair of iovs iov_index += 2; iov = out_iovs + iov_index; + + // to next c0c3 header cache + c0c3_cache_index += nbh; + c0c3_cache = out_c0c3_caches + c0c3_cache_index; // the cache header should never be realloc again, // for the ptr is set to iovs, so we just warn user to set larger @@ -898,7 +905,7 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id) return ret; } -void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, iovec* iov) +void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph) { // to directly set the field. char* pp = NULL; @@ -975,8 +982,8 @@ void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool } // always has header - iov->iov_base = cache; - iov->iov_len = p - cache; + *pnbh = p - cache; + *ph = cache; } int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index e19a18c96..b99f359c3 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -496,9 +496,11 @@ private: * generate the chunk header for msg. * @param mh, the header of msg to send. * @param c0, whether the first chunk, the c0 chunk. - * @param iov, output the header and size to iovec. + * @param pnbh, output the size of header. + * @param ph, output the header cache. + * user should never free it, it's cached header. */ - virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, iovec* iov); + virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph); /** * imp for decode_message */