From 3af83fb02a61585671e2a0ca2862b6116a1c8826 Mon Sep 17 00:00:00 2001 From: wenjiegit Date: Thu, 26 Dec 2013 07:24:19 +0800 Subject: [PATCH] add linux version of band check app; add web version of band check app --- trunk/research/players/js/srs.bandwidth.js | 27 + .../players/srs_bwt/.actionScriptProperties | 40 + trunk/research/players/srs_bwt/.project | 17 + .../players/srs_bwt/release/srs_bwt.swf | Bin 0 -> 2603 bytes .../srs_bwt/src/SrsClass/SrsElapsedTimer.as | 24 + .../srs_bwt/src/SrsClass/SrsSettings.as | 27 + trunk/research/players/srs_bwt/src/srs_bwt.as | 227 +++++ trunk/src/main/srs_main_bandcheck.cpp | 826 ++++++++++++++++++ 8 files changed, 1188 insertions(+) create mode 100755 trunk/research/players/js/srs.bandwidth.js create mode 100755 trunk/research/players/srs_bwt/.actionScriptProperties create mode 100755 trunk/research/players/srs_bwt/.project create mode 100755 trunk/research/players/srs_bwt/release/srs_bwt.swf create mode 100755 trunk/research/players/srs_bwt/src/SrsClass/SrsElapsedTimer.as create mode 100755 trunk/research/players/srs_bwt/src/SrsClass/SrsSettings.as create mode 100755 trunk/research/players/srs_bwt/src/srs_bwt.as create mode 100755 trunk/src/main/srs_main_bandcheck.cpp diff --git a/trunk/research/players/js/srs.bandwidth.js b/trunk/research/players/js/srs.bandwidth.js new file mode 100755 index 000000000..79c51e2f8 --- /dev/null +++ b/trunk/research/players/js/srs.bandwidth.js @@ -0,0 +1,27 @@ +// for bw to init url +// url: scheme://host:port/path?query#fragment +function srs_init_bwt(rtmp_url, hls_url) { + update_nav(); + + if (rtmp_url) { + //var query = parse_query_string(); + var search_filed = String(window.location.search).replace(" ", "").split("?")[1]; + $(rtmp_url).val("rtmp://" + window.location.host + ":" + 1935 + "/app?" + search_filed); + } + if (hls_url) { + $(hls_url).val(build_default_hls_url()); + } +} + +function srs_bwt_check_url(url) { + if (url.indexOf("key") != -1 && url.indexOf("vhost") != -1) { + return true; + } + + return false; +} + +function srs_bwt_build_default_url() { + var url_default = "rtmp://" + window.location.host + ":" + 1935 + "/app?key=35c9b402c12a7246868752e2878f7e0e&vhost=bandcheck.srs.com"; + return url_default; +} \ No newline at end of file diff --git a/trunk/research/players/srs_bwt/.actionScriptProperties b/trunk/research/players/srs_bwt/.actionScriptProperties new file mode 100755 index 000000000..cd87f23d7 --- /dev/null +++ b/trunk/research/players/srs_bwt/.actionScriptProperties @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/trunk/research/players/srs_bwt/.project b/trunk/research/players/srs_bwt/.project new file mode 100755 index 000000000..240a737f9 --- /dev/null +++ b/trunk/research/players/srs_bwt/.project @@ -0,0 +1,17 @@ + + + srs_bwt + + + + + + com.adobe.flexbuilder.project.flexbuilder + + + + + + com.adobe.flexbuilder.project.actionscriptnature + + diff --git a/trunk/research/players/srs_bwt/release/srs_bwt.swf b/trunk/research/players/srs_bwt/release/srs_bwt.swf new file mode 100755 index 0000000000000000000000000000000000000000..3e89783117da7113a3b86e0611bfe262224ed0c3 GIT binary patch literal 2603 zcmV+`3e@#OS5poE5dZ*q+NBp;a~oIr9PO@-c4b|BQyk}}G>YAeY{^b&mO$vz8ipSM08AhXPG0EhT{tQ-&!WD0LEAcwovf6xx}>bcQE*fLXcI@Xla2 z@WSv5#CLXO`GVUQcC6#?e!ja_D~SI)LgDui>cD8Mc|Stv#nwM#gie~OHh5udBEGU% z(yc+jPIVV-yF8dmEiW(kE}!T%%!O3nz`#H%-Jk03?*WM(>xyp6D?Pe(tb52cFh(uK zESBw}p~o36=Z%Ves=K>xRaF{B%N4WanpBk(Ezw1)+g7Ttw+{@d%AjVLi?Tf=m&>K1 zA~SQTl^&~LD3_PzCEBBvWUFv8wI#@O?4n(wL&K_(r}2prU5VcxAKub;r9BZQtZq3v zv@J}TY4<9|VybK!YDEE`G%)2VZ&zU^%9VVnXceeARM9W%#~nt(D<~=Yv8VrvQ3Bj(|sp;`ucnNAM%`Fw3De_xvo>7d_!m~dWSoSMt!wf z?UKYnef{v^{WmTl_xaZ^|Jpwa_5PD&nN}{pY@<)w{t`wANJul~McRj=s2BMFy@nCL zWE9mVH{HE-iPb1PP3^2L+ZAh^;r>kid8*jbY;lpA9%Z_+n5U*cYnw%Vfz)pac{bFd zRW8X_{MoWuw5g;Rx(?b9f#~sdYKP#0(FT2ArOZN}0i_w!STLz&xmN`}`HEdES%OOu zX-O`-P#|ksqrkQz0E`3GqN*oiQFDo!xnenB*+w}BwIU;iQKGWm^1PMvuF93`_cQ}( z6VAaw?dX<}<))x{NiJ2kH1e_;S(I0{;{ikWvULyu(;AU=b+ka0%Rn@37S+fVn_BHV znT!s(3mdxYHalR94(zsIlp6+`tU5MJE&Iq`S9xve3X6>%J-mm>ZGsVNi@*%Wc8X6F zb!J6{7)PEOGjtlN7Yo9N=z|`{!lxaZB^lgZl001~>SG|nvx88NM<9L7?XcHg=A^C}Qdws6U{lkI>}rjp^_8);E{3xwQvK=lgQ-MGqz<~{$oR6YK(WI+ zn>n7o@>D$o*(E*#k=$OxTK+4%ZL@2$W8 zhmGG}zw@gfu7CLM`YRvC)4hFsxndQ9%9gxeDHXvtX?lD%mz^D+eIgr`Rn_(PY|*kQ zq=AF)2U6b;!tCU^@e6|65gm6v`RT^%?{56$d+XP3tiSb(^`HIv&Ij-Q`}Z$(G)NoQ z-}wCFw>>#v@I@#l53$N2wu~A(AhyfW#%teSzxK{i9{PzM+_8ju>tOGMTr5%b@ZQV| zl$C(0hxTS?D~bZi{{qXSB?H<%s8&q(+bRdSF0#tQf&qB1iH4qUj6VXt}pOm+rDUcaBa&2Vc0a~ zEB#qB8_$DoTw%&_R*kr38jEqS6S}Q_=ZAk_)+0dqhOK-?C*8YuKS0U)nKmNhS+wTm< z8@)6b7vo+U2IE4US!Vck7mvifG;O^3yMMmG1nyEU{i<>N21LwtvH|%>?1y&V1fAj1QUV-1dD_V z5}}Fs!$gP>(oBRF5@;hrI}u_8?;}D75%v?IlL!ZhaFF0b1Ro~&2*GiJj}qZ7f{zg) zLGW=RbP?Q5@ZChXhY0r)DM@e-!My~hh(AqmAHn?upCH`*1V2FVgM@pC;D-qwAb61A zM+iPCp2Ff77RNDZJ45he1W%H-DH1wM@Hv8~3C<8aBc8_uUm!S3@GQYki5Ic>G!~!1 z;ye~*U@4-8#RUKggcJ#0B)Cj4yis7$#$pAFOITdS;tCe8VDYP%w7-mr?-fkASK;$* z_`C+H-@#&=FNW$5#y)m%jf1a4>|N|j03Q|vhWgRYNATG0<0Ao)N4P^oNJ2q`notPd z`cZ_v4Mq`amZTO2wz8v*9qpixxtM$I!^j7}4!HgB{@00-Z~!C!L5#>Di~@%-5|3ac z#W4yV#i;2nj6%mS3MVj%ggH?}E!=U0x==Spt#@N)yBYbSnD7|2<1n}s1OEeJ&`Hlr z^Y>`?%J-cCuX#?CevF;8Gl?86Ip3r8$|(k4Ps2@1KG+JIr1a9W}TR@u2QUg3?w5%+TRYLn04K6HYpT zKswq218`>U{M31L_55cE^>!2nK&&QpXox>@pRTq4r&e1_sH@g(X4!~WGiSLS)+83` z`2QQJlZmsI39=YdZj9Q67Vv~!`V}{vu0j+t9NV5})@fO$*2D)zX_5Ib$$Xe)K0M+2 z@Fe(9)t-VIs@lbhLaN%+6(vyBE>#rh=4UF3Hx_1uRr-t2`io5pq+&}SX3rJ7K0uH8z03p8B02y2ukkJ(oB zXi*==fn!dZqIoVoKbM|=E 0){ + kbps = bytes_delta * 8.0 / duration_delta; // b/ms == kbps + } + kbps = (int(kbps * 10))/10.0; + + flash.utils.setTimeout(stopPlayTest, 0); + } + + private function stopPlayTest():void{ + connection.call("onSrsBandCheckStoppedPlayBytes", null); + } + + public function onSrsBandCheckStartPublishBytes(evt:Object):void{ + var duration_ms:Number = evt.duration_ms; + var interval_ms:Number = evt.interval_ms; + + connection.call("onSrsBandCheckStartingPublishBytes", null); + updateState("测试上行带宽(" + server_ip + ")"); + + flash.utils.setTimeout(publisher, 0); + } + + private function publisher():void{ + if (stop_pub) { + return; + } + + var data:Array = new Array(); + + var data_size:int = 100; + for(var i:int; i < data_size; i++){ + data.push("SrS band check data from client's publishing......"); + } + data_size += 100; + connection.call("onSrsBandCheckPublishing", null, data); + + flash.utils.setTimeout(publisher, 0); + } + + public function onSrsBandCheckStopPublishBytes(evt:Object):void{ + var duration_ms:Number = evt.duration_ms; + var interval_ms:Number = evt.interval_ms; + var duration_delta:Number = evt.duration_delta; + var bytes_delta:Number = evt.bytes_delta; + + var kbps:Number = 0; + if(duration_delta > 0){ + kbps = bytes_delta * 8.0 / duration_delta; // b/ms == kbps + } + kbps = (int(kbps * 10))/10.0; + + stopPublishTest(); + } + + private function stopPublishTest():void{ + if(connection.connected){ + connection.call("onSrsBandCheckStoppedPublishBytes", null); + } + stop_pub = true; + + value_progressbar = max_progressbar; + updateProgess(value_progressbar, max_progressbar); + updatePlayProgressTimer.stop(); + } + + public function onSrsBandCheckFinished(evt:Object):void{ + var code:Number = evt.code; + var start_time:Number = evt.start_time; + var end_time:Number = evt.end_time; + var play_kbps:Number = evt.play_kbps; + var publish_kbps:Number = evt.publish_kbps; + var play_bytes:Number = evt.play_bytes; + var play_time:Number = evt.play_time; + var publish_bytes:Number = evt.publish_bytes; + var publish_time:Number = evt.publish_time; + + updateState("检测结束: 服务器: " + server_ip + " 上行: " + publish_kbps + " kbps" + " 下行: " + play_kbps + " kbps" + + " 测试时间: " + (end_time-start_time)/1000 + " 秒"); + connection.call("finalClientPacket", null); + } + + public function onBWDone():void{ + // do nothing + } + + // update progressBar's value + private function updateProgess(value:Number, maxValue:Number):void{ + flash.external.ExternalInterface.call(this.js_update_progress, value * 100 / maxValue + "%"); + trace(value + "-" + maxValue + "-" + value * 100 / maxValue + "%"); + } + + // update checking status + private function updateState(text:String):void{ + flash.external.ExternalInterface.call(this.js_update_status, text); + trace(text); + } + } +} \ No newline at end of file diff --git a/trunk/src/main/srs_main_bandcheck.cpp b/trunk/src/main/srs_main_bandcheck.cpp new file mode 100755 index 000000000..7d82f2a21 --- /dev/null +++ b/trunk/src/main/srs_main_bandcheck.cpp @@ -0,0 +1,826 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 wenjiegit + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +// server play control +#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes" +#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes" +#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes" +#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes" + +// server publish control +#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes" +#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes" +#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes" +#define SRS_BW_CHECK_STOPPED_PUBLISH "onSrsBandCheckStoppedPublishBytes" + +// EOF control. +#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished" +#define SRS_BW_CHECK_FLASH_FINAL "finalClientPacket" + +// client only +#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying" +#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing" + +/** +* @brief class of Linux version band check client +* check play and publish speed. +*/ +class SrsBandCheckClient : public SrsRtmpClient +{ +public: + SrsBandCheckClient(st_netfd_t _stfd); + ~SrsBandCheckClient(); + +public: + /** + * @brief test play + * + */ + int check_play(); + /** + * @brief test publish + * + */ + int check_publish(); + +private: + /** + * @brief just return success. + */ + int create_stream(int& stream_id); + /** + * @brief just return success. + */ + int play(std::string stream, int stream_id); + /** + * @brief just return success. + */ + int publish(std::string stream, int stream_id); + +private: + int expect_start_play(); + int send_starting_play(); + int expect_stop_play(); + int send_stopped_play(); + int expect_start_pub(); + int send_starting_pub(); + int send_pub_data(); + int expect_stop_pub(); + /** + * @brief expect result. + * because the core module has no method to decode this packet + * so we must get the internal data and decode it here. + */ + int expect_finished(); + int send_stopped_pub(); + /** + * @brief notify server the check procedure is over. + */ + int send_final(); +}; + +/** +* @brief class of band check +* used to check band width with a client @param bandCheck_Client +*/ +class SrsBandCheck +{ +public: + SrsBandCheck(); + ~SrsBandCheck(); + +public: + /** + * @brief band check method + * + * connect to server------>rtmp handshake------>rtmp connect------>play------>publish + * @retval ERROR_SUCCESS when success. + */ + int check(const std::string& app, const std::string& tcUrl); + + /** + * @brief set the address and port of test server + * + * @param server server address, domain or ip + * @param server listened port ,default is 1935 + */ + void set_server(const std::string& server, int port = 1935); + +private: + int connect_server(); + +private: + SrsBandCheckClient* bandCheck_Client; + std::string server_address; + int server_port; +}; + +/** +* @brief init st lib +*/ +static int init_st(); +static void print_help(); +static void print_version(); + +/** +* @brief get user option +* @internal ip Mandatory arguments +* @internal key Mandatory arguments +* @internal port default 1935 +* @internal vhost default bandcheck.srs.com +*/ +static int get_opt(int argc ,char* argv[]); + +/** +* global var. +*/ +static struct option long_options[] = +{ + {"ip", required_argument, 0, 'i'}, + {"port", optional_argument, 0, 'p'}, + {"key", required_argument, 0, 'k'}, + {"vhost", optional_argument, 0, 'v'}, + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'V'}, +}; + +static const char* short_options = "i:p::k:v::hV"; + +static std::string g_ip; +static int g_port = 1935; +static std::string g_key; +static std::string g_vhost = "bandcheck.srs.com"; + +#define BUILD_VERSION "srs band check 0.1" + +int main(int argc ,char* argv[]) +{ + int ret = ERROR_SUCCESS; + + if ((ret = get_opt(argc, argv)) != ERROR_SUCCESS) { + return -1; + } + + // check param + if (g_ip.empty()) { + printf("ip address should not be empty."); + return -1; + } + + if (g_key.empty()) { + printf("test key should not be empty."); + return -1; + } + + if ((ret = init_st()) != ERROR_SUCCESS) { + srs_error("band check init failed. ret=%d", ret); + return ret; + } + + std::string app = "app?key=" + g_key + "&vhost=" + g_vhost; + + char tcUrl_buffer[1024] = {0}; + sprintf(tcUrl_buffer, "rtmp://%s:%d/%s", g_ip.c_str(), g_port, app.c_str()); + std::string tcUrl = tcUrl_buffer; + + SrsBandCheck band_check; + band_check.set_server(g_ip, g_port); + if ((ret = band_check.check(app, tcUrl)) != ERROR_SUCCESS) { + srs_error("band check failed. address=%s ret=%d", "xx.com", ret); + return -1; + } + + return 0; +} + +SrsBandCheckClient::SrsBandCheckClient(st_netfd_t _stfd) + : SrsRtmpClient(_stfd) +{ +} + +SrsBandCheckClient::~SrsBandCheckClient() +{ +} + +int SrsBandCheckClient::check_play() +{ + int ret = ERROR_SUCCESS; + + if ((ret = expect_start_play()) != ERROR_SUCCESS) { + srs_error("expect_start_play failed. ret=%d", ret); + return ret; + } + + if ((ret = send_starting_play()) != ERROR_SUCCESS) { + srs_error("send starting play failed. ret=%d", ret); + return ret; + } + + if ((ret = expect_stop_play()) != ERROR_SUCCESS) { + srs_error("expect stop play failed. ret=%d", ret); + return ret; + } + + if ((ret = send_stopped_play()) != ERROR_SUCCESS) { + srs_error("send stopped play failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::check_publish() +{ + int ret = ERROR_SUCCESS; + + if ((ret = expect_start_pub()) != ERROR_SUCCESS) { + srs_error("expect start pub failed. ret=%d", ret); + return ret; + } + + if ((ret = send_starting_pub())!= ERROR_SUCCESS) { + srs_error("send starting pub failed. ret=%d", ret); + return ret; + } + + if ((ret = send_pub_data()) != ERROR_SUCCESS) { + srs_error("publish data failed. ret=%d", ret); + return ret; + } + + if ((ret = send_stopped_pub()) != ERROR_SUCCESS) { + srs_error("send stopped pub failed. ret=%d", ret); + return ret; + } + + if ((ret = expect_finished()) != ERROR_SUCCESS) { + srs_error("expect finished msg failed. ret=%d", ret); + return ret; + } + + if ((ret = send_final()) != ERROR_SUCCESS) { + srs_error("send final msg failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::create_stream(int &stream_id) +{ + return ERROR_SUCCESS; +} + +int SrsBandCheckClient::play(std::string stream, int stream_id) +{ + return ERROR_SUCCESS; +} + +int SrsBandCheckClient::publish(std::string stream, int stream_id) +{ + return ERROR_SUCCESS; +} + +int SrsBandCheckClient::expect_start_play() +{ + int ret = ERROR_SUCCESS; + + // expect connect _result + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect bandcheck start play message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck start play message"); + + if (pkt->command_name != SRS_BW_CHECK_START_PLAY) { + srs_error("pkt error. expect=%s, actual=%s", SRS_BW_CHECK_START_PLAY, pkt->command_name.c_str()); + return -1; + } + + return ret; +} + +int SrsBandCheckClient::send_starting_play() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STARTING_PLAY; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send starting play msg failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::expect_stop_play() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect stop play message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck stop play message"); + + if (pkt->command_name == SRS_BW_CHECK_STOP_PLAY) { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::send_stopped_play() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STOPPED_PLAY; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send stopped play msg failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::expect_start_pub() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect start pub message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck start pub message"); + + if (pkt->command_name == SRS_BW_CHECK_START_PUBLISH) { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::send_starting_pub() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STARTING_PUBLISH; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send starting play msg failed. ret=%d", ret); + return ret; + } + srs_info("send starting play msg success."); + + return ret; +} + +int SrsBandCheckClient::send_pub_data() +{ + int ret = ERROR_SUCCESS; + + int data_count = 100; + while (true) { + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_PUBLISHING; + msg->set_packet(pkt, 0); + + for (int i = 0; i < data_count; ++i) { + std::stringstream seq; + seq << i; + std::string play_data = "SrS band check data from client's publishing......"; + pkt->data->set(seq.str(), new SrsAmf0String(play_data.c_str())); + } + data_count += 100; + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send publish message failed.ret=%d", ret); + return ret; + } + + if ((ret = expect_stop_pub()) == ERROR_SUCCESS) { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::expect_stop_pub() +{ + int ret = ERROR_SUCCESS; + + while (true) { + if ((ret = st_netfd_poll(get_st_fd(), POLLIN, 1000)) == ERROR_SUCCESS) { + SrsCommonMessage* msg = 0; + if ((ret = recv_message(&msg)) != ERROR_SUCCESS) + { + srs_error("recv message failed while expect stop pub. ret=%d", ret); + return ret; + } + + if ((ret = msg->decode_packet(get_protocol())) != ERROR_SUCCESS) { + srs_error("decode packet error while expect stop pub. ret=%d", ret); + return ret; + } + + SrsBandwidthPacket* pkt = dynamic_cast(msg->get_packet()); + if (pkt && pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) { + + return ret; + } + } else { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::expect_finished() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect finished message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck finished message"); + + if (pkt->command_name == SRS_BW_CHECK_FINISHED) { + SrsStream *stream = new SrsStream; + SrsAutoFree(SrsStream, stream, false); + + if ((ret = stream->initialize((char*)msg->payload, msg->size)) != ERROR_SUCCESS) { + srs_error("initialize stream error. ret=%d", ret); + return ret; + } + + std::string command_name; + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amfo read string error. ret=%d", ret); + return ret; + } + + double action_id; + if ((ret = srs_amf0_read_number(stream, action_id)) != ERROR_SUCCESS) { + srs_error("amfo read number error. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amfo read number error. ret=%d", ret); + return ret; + } + + SrsAmf0Object* object; + if ((ret = srs_amf0_read_object(stream, object)) != ERROR_SUCCESS) { + srs_error("amfo read object error. ret=%d", ret); + return ret; + } + + int64_t start_time = 0; + int64_t end_time = 0; + + SrsAmf0Any* start_time_any = object->get_property("start_time"); + if (start_time_any && start_time_any->is_number()) { + SrsAmf0Number* start_time_number = dynamic_cast (start_time_any); + if (start_time_number) { + start_time = start_time_number->value; + } + } + + SrsAmf0Any* end_time_any = object->get_property("end_time"); + if (end_time_any && end_time_any->is_number()) { + SrsAmf0Number* end_time_number = dynamic_cast (end_time_any); + if (end_time_number) { + end_time = end_time_number->value; + } + } + + int play_kbps = 0; + int pub_kbps = 0; + SrsAmf0Any* play_kbp_any = object->get_property("play_kbps"); + if (play_kbp_any && play_kbp_any->is_number()) { + SrsAmf0Number* play_kbps_number = dynamic_cast (play_kbp_any); + if (play_kbps_number) { + play_kbps = play_kbps_number->value; + } + } + + SrsAmf0Any* pub_kbp_any = object->get_property("publish_kbps"); + if (pub_kbp_any && pub_kbp_any->is_number()) { + SrsAmf0Number* pub_kbps_number = dynamic_cast (pub_kbp_any); + if (pub_kbps_number) { + pub_kbps = pub_kbps_number->value; + } + } + + float time_elapsed; + if (end_time - start_time > 0) { + time_elapsed = (end_time - start_time) / 1000.00; + } + + srs_trace("result: play %d kbps, publish %d kbps, check time %.4f S\n" + , play_kbps, pub_kbps, time_elapsed); + + break; + } + } + + return ret; +} + +int SrsBandCheckClient::send_stopped_pub() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STOPPED_PUBLISH; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send stopped pub msg failed. ret=%d", ret); + return ret; + } + srs_info("send stopped pub msg success."); + + return ret; +} + +int SrsBandCheckClient::send_final() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_FLASH_FINAL; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send final msg failed. ret=%d", ret); + return ret; + } + srs_info("send final msg success."); + + return ret; +} + +SrsBandCheck::SrsBandCheck() + : bandCheck_Client(0) +{ +} + +SrsBandCheck::~SrsBandCheck() +{ + if (bandCheck_Client) { + srs_freep(bandCheck_Client); + } +} + +int SrsBandCheck::check(const std::string &app, const std::string &tcUrl) +{ + int ret = ERROR_SUCCESS; + + if ((ret = connect_server()) != ERROR_SUCCESS) { + srs_error("connect to server failed. ret = %d", ret); + return ret; + } + + if ((ret = bandCheck_Client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake failed. ret = %d", ret); + return ret; + } + + if ((ret = bandCheck_Client->connect_app(app, tcUrl)) != ERROR_SUCCESS) { + srs_error("handshake failed. ret = %d", ret); + return ret; + } + + if ((ret = bandCheck_Client->check_play()) != ERROR_SUCCESS) { + srs_error("band check play failed."); + return ret; + } + + if ((ret = bandCheck_Client->check_publish()) != ERROR_SUCCESS) { + srs_error("band check publish failed."); + return ret; + } + + return ret; +} + +void SrsBandCheck::set_server(const std::string &server, int port) +{ + server_address = server; + server_port = port; +} + +int SrsBandCheck::connect_server() +{ + int ret = ERROR_SUCCESS; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + st_netfd_t stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + bandCheck_Client = new SrsBandCheckClient(stfd); + + // connect to server. + std::string ip = srs_dns_resolve(server_address); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(server_port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), server_port, ret); + return ret; + } + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server_address.c_str(), ip.c_str(), server_port); + + return ret; +} + +int init_st() +{ + int ret = ERROR_SUCCESS; + + if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { + ret = ERROR_ST_SET_EPOLL; + srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret); + return ret; + } + + if(st_init() != 0){ + ret = ERROR_ST_INITIALIZE; + srs_error("st_init failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void print_help() +{ + const char *help = "Usage: srs-bandcheck [OPTION]...\n" + "test band width from client to rtmp server.\n" + + "Mandatory arguments to long options are mandatory for short options too.\n" + " -i, --ip the ip or domain that to test\n" + " -p, --port the port that server listen \n" + " -k, --key the key used to test \n" + " -v, --vhost the vhost used to test \n" + " -V, --version output version information and exit \n" + " -h, --help display this help and exit \n" + "\n\n\n" + "Exit status:\n" + "0 if OK,\n" + "other if error occured, and the detail should be printed.\n" + "\n\n" + "srs home page: \n" + "srs home page: \n"; + + printf("%s", help); +} + +void print_version() +{ + const char *version = "" + "srs_bandcheck "BUILD_VERSION"\n" + "Copyright (C) 2013 wenjiegit.\n" + "License MIT\n" + "This is free software: you are free to change and redistribute it.\n" + "There is NO WARRANTY, to the extent permitted by law.\n" + "\n" + "Written by wenjie.\n"; + + printf("%s", version); +} + +int get_opt(int argc, char *argv[]) +{ + int ret = ERROR_SUCCESS; + + int c; + while ((c = getopt_long (argc, argv, short_options, long_options, NULL)) != -1) { + switch (c) { + case 'i': + if (optarg) { + g_ip = optarg; + } + break; + case 'p': + if (optarg) { + g_port = atoi(optarg); + } + break; + case 'k': + if (optarg) { + g_key = optarg; + } + break; + case 'v': + if (optarg) { + g_vhost = optarg; + } + break; + case 'V': + print_version(); + exit(0); + break; + case 'h': + print_help(); + exit(0); + break; + default: + printf("see --help or -h\n"); + ret = -1; + } + } + + return ret; +}