diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 50a124db1..340ae4626 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -883,6 +883,12 @@ vhost stream.control.com { # while the sequence header is not changed yet. # default: off reduce_sequence_header on; + # the 1st packet timeout in ms for encoder. + # default: 20000 + publish_1stpkt_timeout 20000; + # the normal packet timeout in ms for encoder. + # default: 5000 + publish_normal_timeout 7000; } # the vhost for antisuck. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b4371d2aa..e023165f4 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -768,6 +768,28 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } srs_trace("vhost %s reload smi success.", vhost.c_str()); } + // publish_1stpkt_timeout, only one per vhost + if (!srs_directive_equals(new_vhost->get("publish_1stpkt_timeout"), old_vhost->get("publish_1stpkt_timeout"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_p1stpt(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes p1stpt failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload p1stpt success.", vhost.c_str()); + } + // publish_normal_timeout, only one per vhost + if (!srs_directive_equals(new_vhost->get("publish_normal_timeout"), old_vhost->get("publish_normal_timeout"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_pnt(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes pnt failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload pnt success.", vhost.c_str()); + } // min_latency, only one per vhost if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { @@ -1763,6 +1785,7 @@ int SrsConfig::check_config() && n != "debug_srs_upnode" && n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay" && n != "send_min_interval" && n != "reduce_sequence_header" + && n != "publish_1stpkt_timeout" && n != "publish_normal_timeout" && n != "security" && n != "http_remux" && n != "http" && n != "http_static" && n != "hds" @@ -2552,6 +2575,44 @@ bool SrsConfig::get_reduce_sequence_header(string vhost) return SRS_CONF_PERFER_FALSE(conf->arg0()); } +int SrsConfig::get_publish_1stpkt_timeout(string vhost) +{ + // when no msg recevied for publisher, use larger timeout. + static int DEFAULT = 20000; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("publish_1stpkt_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_publish_normal_timeout(string vhost) +{ + // the timeout for publish recv. + // we must use more smaller timeout, for the recv never know the status + // of underlayer socket. + static int DEFAULT = 7000; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("get_publish_normal_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + int SrsConfig::get_global_chunk_size() { SrsConfDirective* conf = root->get("chunk_size"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index a8c6f76de..4dd83c044 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -534,6 +534,14 @@ public: * whether reduce the sequence header. */ virtual bool get_reduce_sequence_header(std::string vhost); + /** + * the 1st packet timeout in ms for encoder. + */ + virtual int get_publish_1stpkt_timeout(std::string vhost); + /** + * the normal packet timeout in ms for encoder. + */ + virtual int get_publish_normal_timeout(std::string vhost); private: /** * get the global chunk size. diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index ca9732988..d660c7630 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -180,6 +180,16 @@ int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_vhost_p1stpt(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + +int ISrsReloadHandler::on_reload_vhost_pnt(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index 2e2f8c765..989b73433 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -75,6 +75,8 @@ public: virtual int on_reload_vhost_mw(std::string vhost); virtual int on_reload_vhost_smi(std::string vhost); virtual int on_reload_vhost_realtime(std::string vhost); + virtual int on_reload_vhost_p1stpt(std::string vhost); + virtual int on_reload_vhost_pnt(std::string vhost); virtual int on_reload_vhost_chunk_size(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost); virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index a24e98030..94883f142 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -274,12 +274,48 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost) } bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost); - srs_trace("realtime changed %d=>%d", realtime, realtime_enabled); - realtime = realtime_enabled; + if (realtime_enabled != realtime) { + srs_trace("realtime changed %d=>%d", realtime, realtime_enabled); + realtime = realtime_enabled; + } return ret; } +int SrsRtmpConn::on_reload_vhost_p1stpt(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + int p1stpt = _srs_config->get_publish_1stpkt_timeout(req->vhost); + if (p1stpt != publish_1stpkt_timeout) { + srs_trace("p1stpt changed %d=>%d", publish_1stpkt_timeout, p1stpt); + publish_1stpkt_timeout = p1stpt; + } + + return ret; +} + +int SrsRtmpConn::on_reload_vhost_pnt(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + int pnt = _srs_config->get_publish_normal_timeout(req->vhost); + if (pnt != publish_normal_timeout) { + srs_trace("p1stpt changed %d=>%d", publish_normal_timeout, pnt); + publish_normal_timeout = pnt; + } + + return ret; +} + void SrsRtmpConn::resample() { kbps->resample(); @@ -803,6 +839,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) // set the sock options. set_sock_options(); + + if (true) { + bool mr = _srs_config->get_mr_enabled(req->vhost); + int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); + publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); + publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); + srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d", mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout); + } int64_t nb_msgs = 0; while (!disposed) { @@ -819,9 +863,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) if (nb_msgs == 0) { // when not got msgs, wait for a larger timeout. // @see https://github.com/simple-rtmp-server/srs/issues/441 - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000); + trd->wait(publish_1stpkt_timeout); } else { - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); + trd->wait(publish_normal_timeout); } // check the thread error code. @@ -847,10 +891,10 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) bool mr = _srs_config->get_mr_enabled(req->vhost); int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pprint->age(), + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), - mr, mr_sleep + mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout ); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 869efde6b..983d1fae0 100755 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -87,6 +87,10 @@ private: bool realtime; // the minimal interval in ms for delivery stream. double send_min_interval; + // publish 1st packet timeout in ms + int publish_1stpkt_timeout; + // publish normal packet timeout in ms + int publish_normal_timeout; public: SrsRtmpConn(SrsServer* svr, st_netfd_t c); virtual ~SrsRtmpConn(); @@ -100,6 +104,8 @@ public: virtual int on_reload_vhost_mw(std::string vhost); virtual int on_reload_vhost_smi(std::string vhost); virtual int on_reload_vhost_realtime(std::string vhost); + virtual int on_reload_vhost_p1stpt(std::string vhost); + virtual int on_reload_vhost_pnt(std::string vhost); // interface IKbpsDelta public: virtual void resample(); diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 1cf9b5b99..ee1142dc6 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -74,12 +74,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // the timeout to wait client data, // if timeout, close the connection. #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL) -// the timeout for publish recv. -// we must use more smaller timeout, for the recv never know the status -// of underlayer socket. -#define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL) -// when no msg recevied for publisher, use larger timeout. -#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US // the timeout to wait for client control message, // if timeout, we generally ignore and send the data to client,