From 6e9f375606618cebcdfec1d40166eaeaf9bba9ee Mon Sep 17 00:00:00 2001 From: wenjie Date: Thu, 12 Dec 2013 20:56:19 +0800 Subject: [PATCH] +server support band check +every vhost can have it's own chunk size --- trunk/auto/build_ffmpeg.sh | 2 +- trunk/conf/srs.19350.conf | 2 +- trunk/conf/srs.conf | 29 ++- trunk/src/core/srs_core.cpp | 10 +- trunk/src/core/srs_core.hpp | 2 +- trunk/src/core/srs_core_client.cpp | 84 ++++++- trunk/src/core/srs_core_client.hpp | 3 +- trunk/src/core/srs_core_config.cpp | 106 +++++++-- trunk/src/core/srs_core_config.hpp | 44 ++-- trunk/src/core/srs_core_error.hpp | 3 +- trunk/src/core/srs_core_protocol.cpp | 95 ++++++-- trunk/src/core/srs_core_protocol.hpp | 31 ++- trunk/src/core/srs_core_rtmp.cpp | 334 ++++++++++++++++++++++++++- trunk/src/core/srs_core_rtmp.hpp | 23 +- trunk/src/core/srs_core_server.cpp | 6 +- trunk/src/core/srs_core_source.cpp | 2 +- 16 files changed, 668 insertions(+), 108 deletions(-) mode change 100644 => 100755 trunk/auto/build_ffmpeg.sh mode change 100644 => 100755 trunk/src/core/srs_core.cpp mode change 100644 => 100755 trunk/src/core/srs_core.hpp mode change 100644 => 100755 trunk/src/core/srs_core_client.cpp mode change 100644 => 100755 trunk/src/core/srs_core_client.hpp mode change 100644 => 100755 trunk/src/core/srs_core_config.cpp mode change 100644 => 100755 trunk/src/core/srs_core_config.hpp mode change 100644 => 100755 trunk/src/core/srs_core_error.hpp mode change 100644 => 100755 trunk/src/core/srs_core_protocol.cpp mode change 100644 => 100755 trunk/src/core/srs_core_protocol.hpp mode change 100644 => 100755 trunk/src/core/srs_core_rtmp.cpp mode change 100644 => 100755 trunk/src/core/srs_core_rtmp.hpp mode change 100644 => 100755 trunk/src/core/srs_core_server.cpp mode change 100644 => 100755 trunk/src/core/srs_core_source.cpp diff --git a/trunk/auto/build_ffmpeg.sh b/trunk/auto/build_ffmpeg.sh old mode 100644 new mode 100755 index 04de4b01a..d890ba471 --- a/trunk/auto/build_ffmpeg.sh +++ b/trunk/auto/build_ffmpeg.sh @@ -80,7 +80,7 @@ else --yasmexe=${ff_yasm_bin} \ --prefix=${ff_release_dir} --cc= \ --enable-static --disable-shared --disable-debug \ - --extra-cflags='-I${ffmpeg_exported_release_dir}/include' \ + --extra-cflags='-I${ffmpeg_exported_release_dir}/include -I /usr/include/freetype2/' \ --extra-ldflags='-L${ffmpeg_exported_release_dir}/lib -lm -ldl' \ --disable-ffplay --disable-ffprobe --disable-ffserver --disable-doc \ --enable-postproc --enable-bzlib --enable-zlib --enable-parsers \ diff --git a/trunk/conf/srs.19350.conf b/trunk/conf/srs.19350.conf index d40676478..7b7594af5 100755 --- a/trunk/conf/srs.19350.conf +++ b/trunk/conf/srs.19350.conf @@ -1,6 +1,6 @@ listen 19350; -chunk_size 65000; vhost __defaultVhost__ { + chunk_size 65000; enabled on; gop_cache on; hls on; diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index fdfe601f3..90dc040cc 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -1,11 +1,5 @@ # the listen ports, split by space. listen 1935; -# the default chunk size is 128, max is 65536, -# some client does not support chunk size change, -# however, most clients supports it and it can improve -# performance about 10%. -# if not specified, set to 4096. -chunk_size 65000; # the logs dir. # if enabled ffmpeg, each stracoding stream will create a log file. # default: ./objs/logs @@ -19,6 +13,12 @@ max_connections 2000; # for which cannot identify the required vhost. # for default demo. vhost __defaultVhost__ { + # the default chunk size is 128, max is 65536, + # some client does not support chunk size change, + # however, most clients supports it and it can improve + # performance about 10%. + # if not specified, set to 4096. + chunk_size 65000; enabled on; gop_cache on; hls on; @@ -79,6 +79,7 @@ vhost __defaultVhost__ { } # for development vhost dev { + chunk_size 65000; enabled on; gop_cache on; hls on; @@ -87,7 +88,7 @@ vhost dev { hls_window 30; forward 127.0.0.1:19350; http_hooks { - enabled on; + enabled off; on_connect http://127.0.0.1:8085/api/v1/clients; on_close http://127.0.0.1:8085/api/v1/clients; on_publish http://127.0.0.1:8085/api/v1/streams; @@ -133,6 +134,20 @@ vhost dev { } } } + +vhost bandcheck.srs.com { + chunk_size 65000; + enabled on; + # vhost for band width check + bandcheck{ + enabled on; + key test kate; + interval 30; + max_play_kbps 45000; + max_pub_kbps 25000; + } +} + # the http hook callback vhost, srs will invoke the hooks for specified events. vhost hooks.callback.vhost.com { http_hooks { diff --git a/trunk/src/core/srs_core.cpp b/trunk/src/core/srs_core.cpp old mode 100644 new mode 100755 index 0aa6c5207..9b458c7c2 --- a/trunk/src/core/srs_core.cpp +++ b/trunk/src/core/srs_core.cpp @@ -33,7 +33,7 @@ static int64_t _srs_system_time_us_cache = 0; int64_t srs_get_system_time_ms() { - return _srs_system_time_us_cache / 1000; + return _srs_system_time_us_cache / 1000; } void srs_update_system_time_ms() @@ -43,7 +43,7 @@ void srs_update_system_time_ms() gettimeofday(&now, NULL); // we must convert the tv_sec/tv_usec to int64_t. - _srs_system_time_us_cache = now.tv_sec * 1000 * 1000 + now.tv_usec; + _srs_system_time_us_cache = ((int64_t)now.tv_sec) * 1000 * 1000 + (int64_t)now.tv_usec; _srs_system_time_us_cache = srs_max(0, _srs_system_time_us_cache); } @@ -103,11 +103,11 @@ void srs_vhost_resolve(std::string& vhost, std::string& app) if ((pos = query.find("vhost?")) != std::string::npos || (pos = query.find("vhost=")) != std::string::npos || (pos = query.find("Vhost?")) != std::string::npos - || (pos = query.find("Vhost=")) != std::string::npos - ) { + || (pos = query.find("Vhost=")) != std::string::npos) + { query = query.substr(pos + 6); if (!query.empty()) { vhost = query; } - } + } } diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp old mode 100644 new mode 100755 index 254efa40e..3be91004e --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -102,4 +102,4 @@ extern std::string srs_dns_resolve(std::string host); // app...vhost...request_vhost extern void srs_vhost_resolve(std::string& vhost, std::string& app); -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp old mode 100644 new mode 100755 index 1b84b3287..0e74eae05 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -117,8 +117,8 @@ int SrsClient::do_cycle() int SrsClient::service_cycle() { int ret = ERROR_SUCCESS; - - if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) { + + if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) { srs_error("set window acknowledgement size failed. ret=%d", ret); return ret; } @@ -129,29 +129,59 @@ int SrsClient::service_cycle() return ret; } srs_verbose("set peer bandwidth success"); + + if(config->get_bw_check_enabled(req->vhost, req->bw_key)) + { + static int64_t last_check_time_ms = srs_get_system_time_ms(); + int64_t interval_ms = 0; + int play_kbps = 0; + int pub_kbps = 0; + config->get_bw_check_settings(req->vhost, interval_ms, play_kbps, pub_kbps); + + if((srs_get_system_time_ms() - last_check_time_ms) < interval_ms + && last_check_time_ms != srs_get_system_time_ms()) + { + srs_trace("bandcheck interval less than limted interval. last time=%lld, current time=%lld" + , last_check_time_ms, srs_get_system_time_ms()); + return rtmp->response_connect_reject(req, "your bandcheck frequency is too high!"); + } else { + last_check_time_ms = srs_get_system_time_ms(); // update last check time + char* local_ip = 0; + if((ret = get_local_ip(local_ip)) != ERROR_SUCCESS){ + srs_error("get local ip failed. ret = %d", ret); + return ret; + } + if ((ret = rtmp->response_connect_app(req, local_ip)) != ERROR_SUCCESS) { + srs_error("response connect app failed. ret=%d", ret); + return ret; + } + return rtmp->start_bandwidth_check(play_kbps, pub_kbps); + } + } if ((ret = rtmp->response_connect_app(req)) != ERROR_SUCCESS) { srs_error("response connect app failed. ret=%d", ret); return ret; } - srs_verbose("response connect app success"); + srs_verbose("response connect app success"); if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) { srs_error("on_bw_done failed. ret=%d", ret); return ret; } srs_verbose("on_bw_done success"); - - SrsClientType type; - if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) { - srs_error("identify client failed. ret=%d", ret); - return ret; - } + + SrsClientType type; + if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) { + srs_error("identify client failed. ret=%d", ret); + return ret; + } + req->strip(); srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); int chunk_size = 4096; - SrsConfDirective* conf = config->get_chunk_size(); + SrsConfDirective* conf = config->get_chunk_size(req->vhost); if (conf && !conf->arg0().empty()) { chunk_size = ::atoi(conf->arg0().c_str()); } @@ -517,6 +547,40 @@ int SrsClient::get_peer_ip() return ret; } +int SrsClient::get_local_ip(char *&local_ip) +{ + int ret = ERROR_SUCCESS; + + int fd = st_netfd_fileno(stfd); + + // discovery client information + sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + if (getsockname(fd, (sockaddr*)&addr, &addrlen) == -1) { + ret = ERROR_SOCKET_GET_LOCAL_IP; + srs_error("discovery local ip information failed. ret=%d", ret); + return ret; + } + srs_verbose("get local ip success."); + + // ip v4 or v6 + char buf[INET6_ADDRSTRLEN]; + memset(buf, 0, sizeof(buf)); + + if ((inet_ntop(addr.sin_family, &addr.sin_addr, buf, sizeof(buf))) == NULL) { + ret = ERROR_SOCKET_GET_LOCAL_IP; + srs_error("convert local ip information failed. ret=%d", ret); + return ret; + } + + local_ip = new char[strlen(buf) + 1]; + strcpy(local_ip, buf); + + srs_verbose("get local ip of client ip=%s, fd=%d", buf, fd); + + return ret; +} + int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp old mode 100644 new mode 100755 index 0d84b527d..c406100db --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -70,6 +70,7 @@ private: virtual int publish(SrsSource* source, bool is_fmle); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle); virtual int get_peer_ip(); + virtual int get_local_ip(char *&local_ip); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); private: virtual int on_connect(); @@ -80,4 +81,4 @@ private: virtual void on_stop(); }; -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp old mode 100644 new mode 100755 index 96bdad2fe..d560eccf8 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -539,7 +539,7 @@ int SrsConfig::parse_options(int argc, char** argv) return parse_file(config_file.c_str()); } -SrsConfDirective* SrsConfig::get_vhost(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost(const std::string& vhost) { srs_assert(root); @@ -562,7 +562,7 @@ SrsConfDirective* SrsConfig::get_vhost(std::string vhost) return NULL; } -SrsConfDirective* SrsConfig::get_vhost_on_connect(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost_on_connect(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -583,7 +583,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_connect(std::string vhost) return conf->get("on_connect"); } -SrsConfDirective* SrsConfig::get_vhost_on_close(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost_on_close(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -604,7 +604,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_close(std::string vhost) return conf->get("on_close"); } -SrsConfDirective* SrsConfig::get_vhost_on_publish(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost_on_publish(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -625,7 +625,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_publish(std::string vhost) return conf->get("on_publish"); } -SrsConfDirective* SrsConfig::get_vhost_on_unpublish(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost_on_unpublish(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -646,7 +646,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_unpublish(std::string vhost) return conf->get("on_unpublish"); } -SrsConfDirective* SrsConfig::get_vhost_on_play(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost_on_play(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -667,7 +667,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_play(std::string vhost) return conf->get("on_play"); } -SrsConfDirective* SrsConfig::get_vhost_on_stop(std::string vhost) +SrsConfDirective* SrsConfig::get_vhost_on_stop(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -688,7 +688,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_stop(std::string vhost) return conf->get("on_stop"); } -bool SrsConfig::get_vhost_enabled(std::string vhost) +bool SrsConfig::get_vhost_enabled(const std::string &vhost) { SrsConfDirective* vhost_conf = get_vhost(vhost); @@ -708,7 +708,7 @@ bool SrsConfig::get_vhost_enabled(std::string vhost) return true; } -SrsConfDirective* SrsConfig::get_transcode(std::string vhost, std::string scope) +SrsConfDirective* SrsConfig::get_transcode(const std::string &vhost, const std::string &scope) { SrsConfDirective* conf = get_vhost(vhost); @@ -1059,7 +1059,7 @@ int SrsConfig::get_max_connections() return ::atoi(conf->arg0().c_str()); } -SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost) +SrsConfDirective* SrsConfig::get_gop_cache(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1070,7 +1070,7 @@ SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost) return conf->get("gop_cache"); } -SrsConfDirective* SrsConfig::get_forward(std::string vhost) +SrsConfDirective* SrsConfig::get_forward(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1081,7 +1081,7 @@ SrsConfDirective* SrsConfig::get_forward(std::string vhost) return conf->get("forward"); } -SrsConfDirective* SrsConfig::get_hls(std::string vhost) +SrsConfDirective* SrsConfig::get_hls(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1092,7 +1092,7 @@ SrsConfDirective* SrsConfig::get_hls(std::string vhost) return conf->get("hls"); } -bool SrsConfig::get_hls_enabled(std::string vhost) +bool SrsConfig::get_hls_enabled(const std::string &vhost) { SrsConfDirective* hls = get_hls(vhost); @@ -1107,7 +1107,7 @@ bool SrsConfig::get_hls_enabled(std::string vhost) return true; } -SrsConfDirective* SrsConfig::get_hls_path(std::string vhost) +SrsConfDirective* SrsConfig::get_hls_path(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1118,7 +1118,7 @@ SrsConfDirective* SrsConfig::get_hls_path(std::string vhost) return conf->get("hls_path"); } -SrsConfDirective* SrsConfig::get_hls_fragment(std::string vhost) +SrsConfDirective* SrsConfig::get_hls_fragment(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1129,7 +1129,7 @@ SrsConfDirective* SrsConfig::get_hls_fragment(std::string vhost) return conf->get("hls_fragment"); } -SrsConfDirective* SrsConfig::get_hls_window(std::string vhost) +SrsConfDirective* SrsConfig::get_hls_window(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1140,7 +1140,7 @@ SrsConfDirective* SrsConfig::get_hls_window(std::string vhost) return conf->get("hls_window"); } -SrsConfDirective* SrsConfig::get_refer(std::string vhost) +SrsConfDirective* SrsConfig::get_refer(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1151,7 +1151,7 @@ SrsConfDirective* SrsConfig::get_refer(std::string vhost) return conf->get("refer"); } -SrsConfDirective* SrsConfig::get_refer_play(std::string vhost) +SrsConfDirective* SrsConfig::get_refer_play(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1162,7 +1162,7 @@ SrsConfDirective* SrsConfig::get_refer_play(std::string vhost) return conf->get("refer_play"); } -SrsConfDirective* SrsConfig::get_refer_publish(std::string vhost) +SrsConfDirective* SrsConfig::get_refer_publish(const std::string &vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1178,9 +1178,15 @@ SrsConfDirective* SrsConfig::get_listen() return root->get("listen"); } -SrsConfDirective* SrsConfig::get_chunk_size() +SrsConfDirective* SrsConfig::get_chunk_size(const std::string &vhost) { - return root->get("chunk_size"); + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + return conf->get("chunk_size"); } SrsConfDirective* SrsConfig::get_pithy_print_publish() @@ -1230,7 +1236,63 @@ SrsConfDirective* SrsConfig::get_pithy_print_play() return NULL; } - return pithy->get("play"); + return pithy->get("play"); +} + +bool SrsConfig::get_bw_check_enabled(const std::string &vhost, const std::string &key) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return false; + } + + SrsConfDirective* bw_test = conf->get("bandcheck"); + if(!bw_test) + return false; + + SrsConfDirective* bw_enabled_conf = bw_test->get("enabled"); + if(bw_enabled_conf && bw_enabled_conf->arg0() == "on"){ + SrsConfDirective* bw_key = bw_test->get("key"); + if(!bw_key) return false; + + std::vector &args = bw_key->args; + for(unsigned int i = 0; i < args.size(); ++i){ + if(args.at(i) == key) + return true; + } + } + + return false; +} + +void SrsConfig::get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &play_kbps, int &pub_kbps) +{ + // set default value; + interval_ms = 30 * 1000; + play_kbps = 45000; + pub_kbps = 25000; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return; + } + + SrsConfDirective* bw_test = conf->get("bandcheck"); + if(!bw_test) + return; + + SrsConfDirective* interval_conf = bw_test->get("interval"); + if(interval_conf) + interval_ms = ::atoll(interval_conf->arg0().c_str()) * 1000; + + SrsConfDirective* play_conf = bw_test->get("max_play_kbps"); + if(play_conf) + play_kbps = ::atoi(play_conf->arg0().c_str()); + + SrsConfDirective* pub_conf = bw_test->get("max_pub_kbps"); + if(pub_conf) + pub_kbps = ::atoi(pub_conf->arg0().c_str()); } int SrsConfig::parse_file(const char* filename) diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp old mode 100644 new mode 100755 index 50538cb53..3c1126d4c --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -117,15 +117,15 @@ public: public: virtual int parse_options(int argc, char** argv); public: - virtual SrsConfDirective* get_vhost(std::string vhost); - virtual bool get_vhost_enabled(std::string vhost); - virtual SrsConfDirective* get_vhost_on_connect(std::string vhost); - virtual SrsConfDirective* get_vhost_on_close(std::string vhost); - virtual SrsConfDirective* get_vhost_on_publish(std::string vhost); - virtual SrsConfDirective* get_vhost_on_unpublish(std::string vhost); - virtual SrsConfDirective* get_vhost_on_play(std::string vhost); - virtual SrsConfDirective* get_vhost_on_stop(std::string vhost); - virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); + virtual SrsConfDirective* get_vhost(const std::string &vhost); + virtual bool get_vhost_enabled(const std::string& vhost); + virtual SrsConfDirective* get_vhost_on_connect(const std::string& vhost); + virtual SrsConfDirective* get_vhost_on_close(const std::string& vhost); + virtual SrsConfDirective* get_vhost_on_publish(const std::string& vhost); + virtual SrsConfDirective* get_vhost_on_unpublish(const std::string& vhost); + virtual SrsConfDirective* get_vhost_on_play(const std::string& vhost); + virtual SrsConfDirective* get_vhost_on_stop(const std::string& vhost); + virtual SrsConfDirective* get_transcode(const std::string& vhost, const std::string& scope); virtual bool get_transcode_enabled(SrsConfDirective* transcode); virtual std::string get_transcode_ffmpeg(SrsConfDirective* transcode); virtual void get_transcode_engines(SrsConfDirective* transcode, std::vector& engines); @@ -148,23 +148,25 @@ public: virtual std::string get_engine_output(SrsConfDirective* engine); virtual std::string get_log_dir(); virtual int get_max_connections(); - virtual SrsConfDirective* get_gop_cache(std::string vhost); - virtual SrsConfDirective* get_forward(std::string vhost); - virtual SrsConfDirective* get_hls(std::string vhost); - virtual bool get_hls_enabled(std::string vhost); - virtual SrsConfDirective* get_hls_path(std::string vhost); - virtual SrsConfDirective* get_hls_fragment(std::string vhost); - virtual SrsConfDirective* get_hls_window(std::string vhost); - virtual SrsConfDirective* get_refer(std::string vhost); - virtual SrsConfDirective* get_refer_play(std::string vhost); - virtual SrsConfDirective* get_refer_publish(std::string vhost); + virtual SrsConfDirective* get_gop_cache(const std::string& vhost); + virtual SrsConfDirective* get_forward(const std::string& vhost); + virtual SrsConfDirective* get_hls(const std::string &vhost); + virtual bool get_hls_enabled(const std::string& vhost); + virtual SrsConfDirective* get_hls_path(const std::string& vhost); + virtual SrsConfDirective* get_hls_fragment(const std::string& vhost); + virtual SrsConfDirective* get_hls_window(const std::string& vhost); + virtual SrsConfDirective* get_refer(const std::string& vhost); + virtual SrsConfDirective* get_refer_play(const std::string& vhost); + virtual SrsConfDirective* get_refer_publish(const std::string& vhost); virtual SrsConfDirective* get_listen(); - virtual SrsConfDirective* get_chunk_size(); + virtual SrsConfDirective* get_chunk_size(const std::string &vhost); virtual SrsConfDirective* get_pithy_print_publish(); virtual SrsConfDirective* get_pithy_print_forwarder(); virtual SrsConfDirective* get_pithy_print_encoder(); virtual SrsConfDirective* get_pithy_print_hls(); virtual SrsConfDirective* get_pithy_print_play(); + virtual bool get_bw_check_enabled(const std::string &vhost, const std::string &key); + virtual void get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &play_kbps, int &pub_kbps); private: virtual int parse_file(const char* filename); virtual int parse_argv(int& i, char** argv); @@ -179,4 +181,4 @@ bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b); // global config extern SrsConfig* config; -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp old mode 100644 new mode 100755 index a5d5348f0..496c05954 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -52,6 +52,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SOCKET_WRITE 209 #define ERROR_SOCKET_WAIT 210 #define ERROR_SOCKET_TIMEOUT 211 +#define ERROR_SOCKET_GET_LOCAL_IP 222 #define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_CHUNK_START 301 @@ -146,4 +147,4 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_DATA_INVLIAD 801 #define ERROR_HTTP_PARSE_HEADER 802 -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp old mode 100644 new mode 100755 index 940666f09..67d6e9f55 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -1138,7 +1138,12 @@ bool SrsMessageHeader::is_set_chunk_size() bool SrsMessageHeader::is_user_control_message() { - return message_type == RTMP_MSG_UserControlMessage; + return message_type == RTMP_MSG_UserControlMessage; +} + +bool SrsMessageHeader::is_windows_ackledgement() +{ + return message_type == RTMP_MSG_Acknowledgement; } SrsChunkStream::SrsChunkStream(int _cid) @@ -1311,7 +1316,21 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) srs_info("decode the AMF0/AMF3 data(onMetaData message)."); packet = new SrsOnMetaDataPacket(); return packet->decode(stream); - } + } else if( command == SRS_BW_CHECK_FINISHED + || command == SRS_BW_CHECK_PLAYING + || command == SRS_BW_CHECK_PUBLISHING + || command == SRS_BW_CHECK_STARTING_PLAY + || command == SRS_BW_CHECK_STARTING_PUBLISH + || command == SRS_BW_CHECK_START_PLAY + || command == SRS_BW_CHECK_START_PUBLISH + || command == SRS_BW_CHECK_STOPPED_PLAY + || command == SRS_BW_CHECK_STOP_PLAY + || command == SRS_BW_CHECK_STOP_PUBLISH) + { + srs_info("decode the AMF0/AMF3 band width check message."); + packet = new SrsOnStatusCallPacket(); + return packet->decode(stream); + } // default packet to drop message. srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); @@ -1329,7 +1348,11 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) srs_verbose("start to decode set chunk size message."); packet = new SrsSetChunkSizePacket(); return packet->decode(stream); - } else { + } else if(header.is_windows_ackledgement()) { + srs_verbose("start to decode AcknowledgementPacket message."); + packet = new SrsAcknowledgementPacket(); + return packet->decode(stream); + } else { // default packet to drop message. srs_trace("drop the unknown message, type=%d", header.message_type); packet = new SrsPacket(); @@ -1775,8 +1798,13 @@ int SrsConnectAppResPacket::get_message_type() int SrsConnectAppResPacket::get_size() { - return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() - + srs_amf0_get_object_size(props)+ srs_amf0_get_object_size(info); + int size = srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size(); + if(props->size() > 0) + size += srs_amf0_get_object_size(props); + if(info->size() > 0) + size += srs_amf0_get_object_size(info); + + return size; } int SrsConnectAppResPacket::encode_packet(SrsStream* stream) @@ -1795,16 +1823,22 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream) } srs_verbose("encode transaction_id success."); - if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) { - srs_error("encode props failed. ret=%d", ret); - return ret; - } + if(props->size() > 0){ + if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) { + srs_error("encode props failed. ret=%d", ret); + return ret; + } + } + srs_verbose("encode props success."); - if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) { - srs_error("encode info failed. ret=%d", ret); - return ret; - } + if(info->size() > 0){ + if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) { + srs_error("encode info failed. ret=%d", ret); + return ret; + } + } + srs_verbose("encode info success."); srs_info("encode connect app response packet success."); @@ -2537,7 +2571,31 @@ SrsOnStatusCallPacket::SrsOnStatusCallPacket() SrsOnStatusCallPacket::~SrsOnStatusCallPacket() { srs_freep(args); - srs_freep(data); + srs_freep(data); +} + +int SrsOnStatusCallPacket::decode(SrsStream *stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode play command_name failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode play transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode play command_object failed. ret=%d", ret); + return ret; + } + + srs_info("decode SrsOnStatusCallPacket success."); + + return ret; } int SrsOnStatusCallPacket::get_perfer_cid() @@ -2853,6 +2911,14 @@ SrsAcknowledgementPacket::~SrsAcknowledgementPacket() { } +int SrsAcknowledgementPacket::decode(SrsStream *stream) +{ + int ret = ERROR_SUCCESS; + ret = sequence_number = stream->read_4bytes(); + + return ret; +} + int SrsAcknowledgementPacket::get_perfer_cid() { return RTMP_CID_ProtocolControl; @@ -3078,4 +3144,3 @@ int SrsUserControlPacket::encode_packet(SrsStream* stream) return ret; } - diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp old mode 100644 new mode 100755 index fe52d5872..fd69848bd --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -220,6 +220,7 @@ struct SrsMessageHeader bool is_window_ackledgement_size(); bool is_set_chunk_size(); bool is_user_control_message(); + bool is_windows_ackledgement(); }; /** @@ -801,6 +802,26 @@ protected: virtual int encode_packet(SrsStream* stream); }; + +/** +* band width check method name, which will be invoked by client. +* band width check mothods use SrsOnStatusCallPacket as its internal packet type, +* so ensure you set command name when you use it. +*/ +// for play +#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes" +#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes" +#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes" +#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes" +#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying" + +// for publish +#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes" +#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes" +#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes" +#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished" +#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing" + /** * onStatus command, AMF0 Call * @remark, user must set the stream_id by SrsMessage.set_packet(). @@ -822,6 +843,10 @@ public: public: SrsOnStatusCallPacket(); virtual ~SrsOnStatusCallPacket(); + +public: + virtual int decode(SrsStream* stream); + public: virtual int get_perfer_cid(); public: @@ -968,6 +993,10 @@ public: public: SrsAcknowledgementPacket(); virtual ~SrsAcknowledgementPacket(); + +public: + virtual int decode(SrsStream *stream); + public: virtual int get_perfer_cid(); public: @@ -1141,4 +1170,4 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** return ret; } -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp old mode 100644 new mode 100755 index f79ffbe83..92203a5a0 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -2,6 +2,7 @@ The MIT License (MIT) Copyright (c) 2013 winlin +Copyright (c) 2013 wenjiejit Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -49,8 +50,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define StatusClientId "clientid" // status value #define StatusLevelStatus "status" +// status error +#define StatusLevelError "error" // code value #define StatusCodeConnectSuccess "NetConnection.Connect.Success" +#define StatusCodeConnectRejected "NetConnection.Connect.Rejected" #define StatusCodeStreamReset "NetStream.Play.Reset" #define StatusCodeStreamStart "NetStream.Play.Start" #define StatusCodeStreamPause "NetStream.Pause.Notify" @@ -67,8 +71,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_DEFAULT_SID 1 SrsRequest::SrsRequest() + : objectEncoding(RTMP_SIG_AMF0_VER) { - objectEncoding = RTMP_SIG_AMF0_VER; } SrsRequest::~SrsRequest() @@ -99,11 +103,23 @@ int SrsRequest::discovery_app() port = vhost.substr(pos + 1); vhost = vhost.substr(0, pos); srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str()); - } - + } + app = url; srs_vhost_resolve(vhost, app); - + + // reslove bw check key + std::string app_str = url; + if ((pos = app_str.find("key=")) != std::string::npos){ + std::string temp_key = app_str.substr(pos + strlen("key=")); + for(unsigned int i = 0; i < temp_key.size(); ++i){ + char c = temp_key[i]; + if(c != '&') + bw_key.push_back(c); + else break; + } + } + // resolve the vhost from config SrsConfDirective* parsed_vhost = config->get_vhost(vhost); if (parsed_vhost) { @@ -238,7 +254,7 @@ int SrsRtmpClient::handshake() return ret; } -int SrsRtmpClient::connect_app(std::string app, std::string tc_url) +int SrsRtmpClient::connect_app(const std::string &app, const std::string &tc_url) { int ret = ERROR_SUCCESS; @@ -324,7 +340,7 @@ int SrsRtmpClient::create_stream(int& stream_id) return ret; } -int SrsRtmpClient::play(std::string stream, int stream_id) +int SrsRtmpClient::play(const std::string &stream, int stream_id) { int ret = ERROR_SUCCESS; @@ -366,7 +382,7 @@ int SrsRtmpClient::play(std::string stream, int stream_id) return ret; } -int SrsRtmpClient::publish(std::string stream, int stream_id) +int SrsRtmpClient::publish(const std::string &stream, int stream_id) { int ret = ERROR_SUCCESS; @@ -544,7 +560,7 @@ int SrsRtmp::set_peer_bandwidth(int bandwidth, int type) return ret; } -int SrsRtmp::response_connect_app(SrsRequest* req) +int SrsRtmp::response_connect_app(SrsRequest *req, const char *ip) { int ret = ERROR_SUCCESS; @@ -556,7 +572,7 @@ int SrsRtmp::response_connect_app(SrsRequest* req) pkt->props->set("mode", new SrsAmf0Number(1)); pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); - pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess)); + pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess)); pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded")); pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding)); SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray(); @@ -571,6 +587,9 @@ int SrsRtmp::response_connect_app(SrsRequest* req) data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB)); data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL)); data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT)); + + if(ip) + data->set("srs_server_ip", new SrsAmf0String(ip)); msg->set_packet(pkt, 0); @@ -580,7 +599,30 @@ int SrsRtmp::response_connect_app(SrsRequest* req) } srs_info("send connect app response message success."); - return ret; + return ret; +} + +int SrsRtmp::response_connect_reject(SrsRequest *req, const std::string &description) +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket(); + pkt->command_name = "_error"; + pkt->props->set(StatusLevel, new SrsAmf0String(StatusLevelError)); + pkt->props->set(StatusCode, new SrsAmf0String(StatusCodeConnectRejected)); + pkt->props->set(StatusDescription, new SrsAmf0String(description.c_str())); + //pkt->props->set("objectEncoding", new SrsAmf0Number(req->objectEncoding)); + + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send connect app response rejected message failed. ret=%d", ret); + return ret; + } + srs_info("send connect app response rejected message success."); + + return ret; } int SrsRtmp::on_bw_done() @@ -1029,7 +1071,60 @@ int SrsRtmp::start_flash_publish(int stream_id) srs_info("flash publish success."); - return ret; + return ret; +} + +int SrsRtmp::start_bandwidth_check(int max_play_kbps, int max_pub_kbps) +{ + int ret = ERROR_SUCCESS; + + int play_duration_ms = 3000; + int play_interval_ms = 0; + int play_actual_duration_ms = 0; + int play_bytes = 0; + + int publish_duration_ms = 3000; + int publish_interval_ms = 0; + int publish_actual_duration_ms = 0; + int publish_bytes = 0; + + int64_t start_time = srs_get_system_time_ms(); + if((ret = bandwidth_check_play(play_duration_ms, play_interval_ms, + play_actual_duration_ms, play_bytes, max_play_kbps) != ERROR_SUCCESS) + || (ret = bandwidth_check_publish(publish_duration_ms, publish_interval_ms, + publish_actual_duration_ms, publish_bytes, max_pub_kbps)) != ERROR_SUCCESS) + { + srs_error("band width check failed. ret = %d", ret); + + return ret; + } + + int64_t end_time = srs_get_system_time_ms(); + int play_kbps = play_bytes * 8 / play_actual_duration_ms; + int publish_kbps = publish_bytes * 8 / publish_actual_duration_ms; + + // send finished msg + SrsCommonMessage* finish_msg = new SrsCommonMessage(); + SrsOnStatusCallPacket* finish_pkt = new SrsOnStatusCallPacket; + finish_pkt->command_name = SRS_BW_CHECK_FINISHED; + finish_pkt->data->set("code", new SrsAmf0Number(0)); + finish_pkt->data->set("start_time", new SrsAmf0Number(start_time)); + finish_pkt->data->set("end_time", new SrsAmf0Number(end_time)); + finish_pkt->data->set("play_kbps", new SrsAmf0Number(play_kbps)); + finish_pkt->data->set("publish_kbps", new SrsAmf0Number(publish_kbps)); + finish_pkt->data->set("play_bytes", new SrsAmf0Number(play_bytes)); + finish_pkt->data->set("play_time", new SrsAmf0Number(play_actual_duration_ms)); + finish_pkt->data->set("publish_bytes", new SrsAmf0Number(publish_bytes)); + finish_pkt->data->set("publish_time", new SrsAmf0Number(publish_actual_duration_ms)); + + finish_msg->set_packet(finish_pkt, 0); + if ((ret = protocol->send_message(finish_msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check finish message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check finished."); + + return ret; } int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) @@ -1120,6 +1215,221 @@ int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type = SrsClientFlashPublish; stream_name = req->stream_name; - return ret; + return ret; +} + +int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_duration_ms, + int &play_bytes, int max_play_kbps) +{ + int ret = ERROR_SUCCESS; + + // send start play command to client + SrsCommonMessage* start_play_msg = new SrsCommonMessage(); + SrsOnStatusCallPacket* start_play_packet = new SrsOnStatusCallPacket; + start_play_packet->command_name = SRS_BW_CHECK_START_PLAY; + start_play_packet->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + start_play_packet->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + + start_play_msg->set_packet(start_play_packet, 0); + if ((ret = protocol->send_message(start_play_msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check start play message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check begin."); + + // recv client's starting play response + while (true) { + SrsCommonMessage* msg = 0; + if( (ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) + { + srs_error("recv client's starting play response failed. ret= %d", ret); + return ret; + } + + msg->decode_packet(protocol); + SrsOnStatusCallPacket* pkt = dynamic_cast(msg->get_packet()); + if(pkt && (pkt->command_name == SRS_BW_CHECK_STARTING_PLAY)) + break; + } + srs_trace("BW check recv play begin response."); + + // send play data to client + int64_t current_Time = srs_get_system_time_ms(); + int size = 1024*4; // 32KB + char random_data[size]; + memset(random_data, 0x01, size); + + int64_t last_time = current_Time; + int interval = 0; + while ( (srs_get_system_time_ms() - current_Time) < duration_ms ){ + st_usleep(interval); + SrsCommonMessage* msg = new SrsCommonMessage; + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket; + pkt->command_name = SRS_BW_CHECK_PLAYING; + + for(int i = 0; i < 10; ++i) + { + char buf[32]; + sprintf(buf, "%d", i); + pkt->data->set(buf, new SrsAmf0String(random_data)); + } + msg->set_packet(pkt, 0); + + play_bytes += pkt->get_payload_length(); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check play messages failed. ret=%d", ret); + return ret; + } + + if((srs_get_system_time_ms() - last_time) > 5){ // check kbps every 5 ms; + int kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_Time); + if(kbps > max_play_kbps){ + interval += 1000*3; // 2 ms + } else { + interval -= 1000*3; + if(interval < 0) + interval = 0; + } + } + } + actual_duration_ms = srs_get_system_time_ms() - current_Time; + srs_trace("BW check send play bytes over."); + + // notify client to stop play + SrsCommonMessage* stop_play_msg = new SrsCommonMessage; + SrsOnStatusCallPacket* stop_play_pkt = new SrsOnStatusCallPacket; + stop_play_pkt->command_name = SRS_BW_CHECK_STOP_PLAY; + stop_play_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + stop_play_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + stop_play_pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms)); + stop_play_pkt->data->set("bytes_delta", new SrsAmf0Number(play_bytes)); + + stop_play_msg->set_packet(stop_play_pkt, 0); + + if ((ret = protocol->send_message(stop_play_msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check stop play message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check stop play bytes."); + + // recv client's stop play response. + while (true) { + SrsCommonMessage* msg = 0; + if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) + { + srs_error("recv client's stop play response failed. ret = %d", ret); + return ret; + } + + msg->decode_packet(protocol); + SrsOnStatusCallPacket* pkt = dynamic_cast(msg->get_packet()); + if(pkt && (pkt->command_name == SRS_BW_CHECK_STOPPED_PLAY)) + break; + } + srs_trace("BW check recv stop play response."); + + return ret; +} + +int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actual_duration_ms, + int &publish_bytes, int max_pub_kbps) +{ + int ret = ERROR_SUCCESS; + + // notify client to start publish + SrsCommonMessage* start_publish_msg = new SrsCommonMessage; + SrsOnStatusCallPacket* start_publish_pkt = new SrsOnStatusCallPacket; + start_publish_pkt->command_name = SRS_BW_CHECK_START_PUBLISH; + start_publish_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + start_publish_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + + start_publish_msg->set_packet(start_publish_pkt, 0); + if ((ret = protocol->send_message(start_publish_msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check start publish message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check publish begin."); + + // read client's notification of starting publish + while (true) { + SrsCommonMessage* msg = 0; + if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) + { + srs_error("recv client's notification of starting publish failed. ret = %d", ret); + return ret; + } + + msg->decode_packet(protocol); + SrsOnStatusCallPacket* pkt = dynamic_cast(msg->get_packet()); + if(pkt && (pkt->command_name == SRS_BW_CHECK_STARTING_PUBLISH)) + break; + } + srs_trace("BW check recv publish begin response."); + + // recv publish msgs until @duration_ms ms + int64_t current_time = srs_get_system_time_ms(); + int64_t last_time = current_time; + int interval = 0; + + while( (srs_get_system_time_ms() - current_time) < duration_ms ) + { + st_usleep(interval); + SrsCommonMessage* msg = NULL; + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + + publish_bytes += msg->header.payload_length; + + if((srs_get_system_time_ms() - last_time) > 5){ // check kbps every 5 ms; + int kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time); + if(kbps > max_pub_kbps){ + interval += 1000*3; // 2 ms + } else { + interval -= 1000*3; + if(interval < 0) + interval = 0; + } + } + } + actual_duration_ms = srs_get_system_time_ms() - current_time; + srs_trace("BW check recv publish data over."); + + // notify client to stop publish + SrsCommonMessage* stop_publish_msg = new SrsCommonMessage; + SrsOnStatusCallPacket* stop_publish_pkt = new SrsOnStatusCallPacket; + stop_publish_pkt->command_name = SRS_BW_CHECK_STOP_PUBLISH; + stop_publish_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + stop_publish_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + stop_publish_pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms)); + stop_publish_pkt->data->set("bytes_delta", new SrsAmf0Number(publish_bytes)); + + stop_publish_msg->set_packet(stop_publish_pkt, 0); + if ((ret = protocol->send_message(stop_publish_msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check stop publish message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check stop pulish."); + + // recv left msg + while (true) { + if((ret = st_netfd_poll(stfd, POLLIN, 1000*500)) == ERROR_SUCCESS) + { + SrsCommonMessage* msg = 0; + if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) + { + srs_error("recv client's left msg failed, ret = %d", ret); + return ret; + } + } else { + ret = ERROR_SUCCESS; + break; + } + } + + return ret; } diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp old mode 100644 new mode 100755 index 3c18eb479..e94a3f3b7 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -2,6 +2,7 @@ The MIT License (MIT) Copyright (c) 2013 winlin +Copyright (c) 2013 wenjiegit Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -64,6 +65,7 @@ struct SrsRequest std::string port; std::string app; std::string stream; + std::string bw_key; SrsRequest(); virtual ~SrsRequest(); @@ -97,7 +99,7 @@ enum SrsClientType SrsClientUnknown, SrsClientPlay, SrsClientFMLEPublish, - SrsClientFlashPublish, + SrsClientFlashPublish }; /** @@ -122,10 +124,10 @@ public: virtual int send_message(ISrsMessage* msg); public: virtual int handshake(); - virtual int connect_app(std::string app, std::string tc_url); + virtual int connect_app(const std::string &app, const std::string &tc_url); virtual int create_stream(int& stream_id); - virtual int play(std::string stream, int stream_id); - virtual int publish(std::string stream, int stream_id); + virtual int play(const std::string &stream, int stream_id); + virtual int publish(const std::string &stream, int stream_id); }; /** @@ -161,7 +163,8 @@ public: * using the Limit type field. */ virtual int set_peer_bandwidth(int bandwidth, int type); - virtual int response_connect_app(SrsRequest* req); + virtual int response_connect_app(SrsRequest* req, const char *ip = 0); + virtual int response_connect_reject(SrsRequest* req, const std::string& description); virtual int on_bw_done(); /** * recv some message to identify the client. @@ -212,10 +215,18 @@ public: * onStatus(NetStream.Publish.Start) */ virtual int start_flash_publish(int stream_id); + + /** + * used to process band width check from client. + */ + virtual int start_bandwidth_check(int max_play_kbps, int max_pub_kbps); + private: virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name); virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name); + virtual int bandwidth_check_play(int duration_ms, int interval_ms, int& actual_duration_ms, int& play_bytes, int max_play_kbps); + virtual int bandwidth_check_publish(int duration_ms, int interval_ms, int& actual_duration_ms, int& publish_bytes, int max_pub_kbps); }; -#endif \ No newline at end of file +#endif diff --git a/trunk/src/core/srs_core_server.cpp b/trunk/src/core/srs_core_server.cpp old mode 100644 new mode 100755 index ab6e22102..f9be968cc --- a/trunk/src/core/srs_core_server.cpp +++ b/trunk/src/core/srs_core_server.cpp @@ -144,14 +144,14 @@ void SrsListener::listen_cycle() // ignore error. srs_warn("ignore accept thread stoppped for accept client error"); continue; - } + } srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) { srs_warn("accept client error. ret=%d", ret); continue; } - + srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); } } @@ -250,7 +250,7 @@ int SrsServer::cycle() // the deamon thread, update the time cache while (true) { st_usleep(SRS_TIME_RESOLUTION_MS * 1000); - srs_update_system_time_ms(); + srs_update_system_time_ms(); if (signal_reload) { signal_reload = false; diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp old mode 100644 new mode 100755 index 3277ebb85..786cd5f7b --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -344,7 +344,7 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) std::map SrsSource::pool; -SrsSource* SrsSource::find(std::string stream_url) +SrsSource* SrsSource::find(const std::string &stream_url) { if (pool.find(stream_url) == pool.end()) { pool[stream_url] = new SrsSource(stream_url);