diff --git a/README.md b/README.md index a92d7a27e..3a1a3f2ca 100755 --- a/README.md +++ b/README.md @@ -759,18 +759,6 @@ About the HLS overhead of SRS, we compare the overhead to FLV by remux the HLS t The HLS overhead is calc by: (HLS - FLV) / FLV * 100% -### Bytes Api - -The api provides bytes of vhost, stream and srs. - -| Connections | tcpdump | srs | deviation | -| ----------- | ---------- | ---------- | --------- | -| 1 | 201687968 | 201687968 | 0% | -| 100 | 12456042662 | 12458284053 | 0.018% | -| 500 | 11623083989 | 11633158616 | 0.087% | - -The tcpdump command is `tcpdump src 192.168.1.151 and port 1935 -i eth0 and tcp -ql`. - ## Architecture SRS always use the most simple architecture to support complex transaction. diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 50a124db1..340ae4626 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -883,6 +883,12 @@ vhost stream.control.com { # while the sequence header is not changed yet. # default: off reduce_sequence_header on; + # the 1st packet timeout in ms for encoder. + # default: 20000 + publish_1stpkt_timeout 20000; + # the normal packet timeout in ms for encoder. + # default: 5000 + publish_normal_timeout 7000; } # the vhost for antisuck. diff --git a/trunk/research/librtmp/srs_rtmp_dump.c b/trunk/research/librtmp/srs_rtmp_dump.c index 49ea8edb9..3f3d030aa 100644 --- a/trunk/research/librtmp/srs_rtmp_dump.c +++ b/trunk/research/librtmp/srs_rtmp_dump.c @@ -258,8 +258,8 @@ int main(int argc, char** argv) int64_t nb_packets = 0; u_int32_t pre_timestamp = 0; - int64_t pre_now = srs_utils_time_ms(); - int64_t start_time = pre_now; + int64_t pre_now = -1; + int64_t start_time = -1; for (;;) { int size; char type; @@ -271,6 +271,13 @@ int main(int argc, char** argv) goto rtmp_destroy; } + if (pre_now == -1) { + pre_now = srs_utils_time_ms(); + } + if (start_time == -1) { + start_time = srs_utils_time_ms(); + } + if (srs_human_print_rtmp_packet4(type, timestamp, data, size, pre_timestamp, pre_now, start_time, nb_packets++) != 0) { srs_human_trace("print rtmp packet failed."); goto rtmp_destroy; diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b4371d2aa..2c2f3e273 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -768,6 +768,28 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } srs_trace("vhost %s reload smi success.", vhost.c_str()); } + // publish_1stpkt_timeout, only one per vhost + if (!srs_directive_equals(new_vhost->get("publish_1stpkt_timeout"), old_vhost->get("publish_1stpkt_timeout"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_p1stpt(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes p1stpt failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload p1stpt success.", vhost.c_str()); + } + // publish_normal_timeout, only one per vhost + if (!srs_directive_equals(new_vhost->get("publish_normal_timeout"), old_vhost->get("publish_normal_timeout"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_pnt(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes pnt failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload pnt success.", vhost.c_str()); + } // min_latency, only one per vhost if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { @@ -1763,6 +1785,7 @@ int SrsConfig::check_config() && n != "debug_srs_upnode" && n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay" && n != "send_min_interval" && n != "reduce_sequence_header" + && n != "publish_1stpkt_timeout" && n != "publish_normal_timeout" && n != "security" && n != "http_remux" && n != "http" && n != "http_static" && n != "hds" @@ -2552,6 +2575,44 @@ bool SrsConfig::get_reduce_sequence_header(string vhost) return SRS_CONF_PERFER_FALSE(conf->arg0()); } +int SrsConfig::get_publish_1stpkt_timeout(string vhost) +{ + // when no msg recevied for publisher, use larger timeout. + static int DEFAULT = 20000; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("publish_1stpkt_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_publish_normal_timeout(string vhost) +{ + // the timeout for publish recv. + // we must use more smaller timeout, for the recv never know the status + // of underlayer socket. + static int DEFAULT = 5000; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("publish_normal_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + int SrsConfig::get_global_chunk_size() { SrsConfDirective* conf = root->get("chunk_size"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index a8c6f76de..4dd83c044 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -534,6 +534,14 @@ public: * whether reduce the sequence header. */ virtual bool get_reduce_sequence_header(std::string vhost); + /** + * the 1st packet timeout in ms for encoder. + */ + virtual int get_publish_1stpkt_timeout(std::string vhost); + /** + * the normal packet timeout in ms for encoder. + */ + virtual int get_publish_normal_timeout(std::string vhost); private: /** * get the global chunk size. diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index ca9732988..d660c7630 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -180,6 +180,16 @@ int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_vhost_p1stpt(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + +int ISrsReloadHandler::on_reload_vhost_pnt(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index 2e2f8c765..989b73433 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -75,6 +75,8 @@ public: virtual int on_reload_vhost_mw(std::string vhost); virtual int on_reload_vhost_smi(std::string vhost); virtual int on_reload_vhost_realtime(std::string vhost); + virtual int on_reload_vhost_p1stpt(std::string vhost); + virtual int on_reload_vhost_pnt(std::string vhost); virtual int on_reload_vhost_chunk_size(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost); virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index a24e98030..98b3d8732 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -274,12 +274,48 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost) } bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost); - srs_trace("realtime changed %d=>%d", realtime, realtime_enabled); - realtime = realtime_enabled; + if (realtime_enabled != realtime) { + srs_trace("realtime changed %d=>%d", realtime, realtime_enabled); + realtime = realtime_enabled; + } return ret; } +int SrsRtmpConn::on_reload_vhost_p1stpt(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + int p1stpt = _srs_config->get_publish_1stpkt_timeout(req->vhost); + if (p1stpt != publish_1stpkt_timeout) { + srs_trace("p1stpt changed %d=>%d", publish_1stpkt_timeout, p1stpt); + publish_1stpkt_timeout = p1stpt; + } + + return ret; +} + +int SrsRtmpConn::on_reload_vhost_pnt(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + int pnt = _srs_config->get_publish_normal_timeout(req->vhost); + if (pnt != publish_normal_timeout) { + srs_trace("p1stpt changed %d=>%d", publish_normal_timeout, pnt); + publish_normal_timeout = pnt; + } + + return ret; +} + void SrsRtmpConn::resample() { kbps->resample(); @@ -803,6 +839,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) // set the sock options. set_sock_options(); + + if (true) { + bool mr = _srs_config->get_mr_enabled(req->vhost); + int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); + publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); + publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); + srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d", mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout); + } int64_t nb_msgs = 0; while (!disposed) { @@ -819,9 +863,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) if (nb_msgs == 0) { // when not got msgs, wait for a larger timeout. // @see https://github.com/simple-rtmp-server/srs/issues/441 - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000); + trd->wait(publish_1stpkt_timeout); } else { - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); + trd->wait(publish_normal_timeout); } // check the thread error code. @@ -835,8 +879,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) // when not got any messages, timeout. if (trd->nb_msgs() <= nb_msgs) { ret = ERROR_SOCKET_TIMEOUT; - srs_warn("publish timeout %"PRId64"us, nb_msgs=%"PRId64", ret=%d", - SRS_CONSTS_RTMP_RECV_TIMEOUT_US, nb_msgs, ret); + srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d", + nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret); break; } nb_msgs = trd->nb_msgs(); @@ -847,10 +891,10 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) bool mr = _srs_config->get_mr_enabled(req->vhost); int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pprint->age(), + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), - mr, mr_sleep + mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout ); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 869efde6b..983d1fae0 100755 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -87,6 +87,10 @@ private: bool realtime; // the minimal interval in ms for delivery stream. double send_min_interval; + // publish 1st packet timeout in ms + int publish_1stpkt_timeout; + // publish normal packet timeout in ms + int publish_normal_timeout; public: SrsRtmpConn(SrsServer* svr, st_netfd_t c); virtual ~SrsRtmpConn(); @@ -100,6 +104,8 @@ public: virtual int on_reload_vhost_mw(std::string vhost); virtual int on_reload_vhost_smi(std::string vhost); virtual int on_reload_vhost_realtime(std::string vhost); + virtual int on_reload_vhost_p1stpt(std::string vhost); + virtual int on_reload_vhost_pnt(std::string vhost); // interface IKbpsDelta public: virtual void resample(); diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 1cf9b5b99..ee1142dc6 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -74,12 +74,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // the timeout to wait client data, // if timeout, close the connection. #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL) -// the timeout for publish recv. -// we must use more smaller timeout, for the recv never know the status -// of underlayer socket. -#define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL) -// when no msg recevied for publisher, use larger timeout. -#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US // the timeout to wait for client control message, // if timeout, we generally ignore and send the data to client, diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 78b253e1f..db2536ca9 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -2338,6 +2338,12 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int pi = (pre_now - starttime) / (double)nb_packets; } + // global fps(video and audio mixed fps). + double gfps = 0; + if (pi > 0) { + gfps = 1000 / pi; + } + int diff = 0; if (pre_timestamp > 0) { diff = (int)timestamp - (int)pre_timestamp; @@ -2350,22 +2356,22 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int u_int32_t pts; if (srs_utils_parse_timestamp(timestamp, type, data, size, &pts) != 0) { - srs_human_trace("Rtmp packet id=%"PRId64"/%.1f, type=%s, dts=%d, diff=%d, ndiff=%d, size=%d, DecodeError", - nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size + srs_human_trace("Rtmp packet id=%"PRId64"/%.1f/%.1f, type=%s, dts=%d, ndiff=%d, diff=%d, size=%d, DecodeError", + nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, ndiff, diff, size ); return ret; } if (type == SRS_RTMP_TYPE_VIDEO) { - srs_human_trace("Video packet id=%"PRId64"/%.1f, type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s)", - nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size, + srs_human_trace("Video packet id=%"PRId64"/%.1f/%.1f, type=%s, dts=%d, pts=%d, ndiff=%d, diff=%d, size=%d, %s(%s,%s)", + nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, pts, ndiff, diff, size, srs_human_flv_video_codec_id2string(srs_utils_flv_video_codec_id(data, size)), srs_human_flv_video_avc_packet_type2string(srs_utils_flv_video_avc_packet_type(data, size)), srs_human_flv_video_frame_type2string(srs_utils_flv_video_frame_type(data, size)) ); } else if (type == SRS_RTMP_TYPE_AUDIO) { - srs_human_trace("Audio packet id=%"PRId64"/%.1f, type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s,%s,%s)", - nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size, + srs_human_trace("Audio packet id=%"PRId64"/%.1f/%.1f, type=%s, dts=%d, pts=%d, ndiff=%d, diff=%d, size=%d, %s(%s,%s,%s,%s)", + nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, pts, ndiff, diff, size, srs_human_flv_audio_sound_format2string(srs_utils_flv_audio_sound_format(data, size)), srs_human_flv_audio_sound_rate2string(srs_utils_flv_audio_sound_rate(data, size)), srs_human_flv_audio_sound_size2string(srs_utils_flv_audio_sound_size(data, size)), @@ -2373,8 +2379,8 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int srs_human_flv_audio_aac_packet_type2string(srs_utils_flv_audio_aac_packet_type(data, size)) ); } else if (type == SRS_RTMP_TYPE_SCRIPT) { - srs_human_verbose("Data packet id=%"PRId64"/%.1f, type=%s, time=%d, diff=%d, ndiff=%d, size=%d", - nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size); + srs_human_verbose("Data packet id=%"PRId64"/%.1f/%.1f, type=%s, time=%d, ndiff=%d, diff=%d, size=%d", + nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, ndiff, diff, size); int nparsed = 0; while (nparsed < size) { int nb_parsed_this = 0; @@ -2390,8 +2396,8 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int srs_freep(amf0_str); } } else { - srs_human_trace("Rtmp packet id=%"PRId64"/%.1f, type=%#x, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d", - nb_packets, pi, type, timestamp, pts, diff, ndiff, size); + srs_human_trace("Rtmp packet id=%"PRId64"/%.1f/%.1f, type=%#x, dts=%d, pts=%d, ndiff=%d, diff=%d, size=%d", + nb_packets, pi, gfps, type, timestamp, pts, ndiff, diff, size); } return ret;