diff --git a/trunk/research/players/js/srs.bandwidth.js b/trunk/research/players/js/srs.bandwidth.js new file mode 100755 index 000000000..79c51e2f8 --- /dev/null +++ b/trunk/research/players/js/srs.bandwidth.js @@ -0,0 +1,27 @@ +// for bw to init url +// url: scheme://host:port/path?query#fragment +function srs_init_bwt(rtmp_url, hls_url) { + update_nav(); + + if (rtmp_url) { + //var query = parse_query_string(); + var search_filed = String(window.location.search).replace(" ", "").split("?")[1]; + $(rtmp_url).val("rtmp://" + window.location.host + ":" + 1935 + "/app?" + search_filed); + } + if (hls_url) { + $(hls_url).val(build_default_hls_url()); + } +} + +function srs_bwt_check_url(url) { + if (url.indexOf("key") != -1 && url.indexOf("vhost") != -1) { + return true; + } + + return false; +} + +function srs_bwt_build_default_url() { + var url_default = "rtmp://" + window.location.host + ":" + 1935 + "/app?key=35c9b402c12a7246868752e2878f7e0e&vhost=bandcheck.srs.com"; + return url_default; +} \ No newline at end of file diff --git a/trunk/research/players/srs_bwt/.actionScriptProperties b/trunk/research/players/srs_bwt/.actionScriptProperties new file mode 100755 index 000000000..cd87f23d7 --- /dev/null +++ b/trunk/research/players/srs_bwt/.actionScriptProperties @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/trunk/research/players/srs_bwt/.project b/trunk/research/players/srs_bwt/.project new file mode 100755 index 000000000..240a737f9 --- /dev/null +++ b/trunk/research/players/srs_bwt/.project @@ -0,0 +1,17 @@ + + + srs_bwt + + + + + + com.adobe.flexbuilder.project.flexbuilder + + + + + + com.adobe.flexbuilder.project.actionscriptnature + + diff --git a/trunk/research/players/srs_bwt/release/srs_bwt.swf b/trunk/research/players/srs_bwt/release/srs_bwt.swf new file mode 100755 index 000000000..3e8978311 Binary files /dev/null and b/trunk/research/players/srs_bwt/release/srs_bwt.swf differ diff --git a/trunk/research/players/srs_bwt/src/SrsClass/SrsElapsedTimer.as b/trunk/research/players/srs_bwt/src/SrsClass/SrsElapsedTimer.as new file mode 100755 index 000000000..37884c2ee --- /dev/null +++ b/trunk/research/players/srs_bwt/src/SrsClass/SrsElapsedTimer.as @@ -0,0 +1,24 @@ +package SrsClass +{ + import flash.system.System; + + public class SrsElapsedTimer + { + private var beginDate:Date; + public function SrsElapsedTimer() + { + beginDate = new Date; + } + + public function elapsed():Number{ + var endDate:Date = new Date; + + // get deiff by ms + return (endDate.time - beginDate.time); + } + + public function restart():void{ + beginDate = new Date; + } + } +} \ No newline at end of file diff --git a/trunk/research/players/srs_bwt/src/SrsClass/SrsSettings.as b/trunk/research/players/srs_bwt/src/SrsClass/SrsSettings.as new file mode 100755 index 000000000..9aa10fe08 --- /dev/null +++ b/trunk/research/players/srs_bwt/src/SrsClass/SrsSettings.as @@ -0,0 +1,27 @@ +package SrsClass +{ + import flash.net.SharedObject; + + public class SrsSettings + { + private var settings:SharedObject; + private var key:String = "SrsBandCheck"; + + public function SrsSettings() + { + settings = SharedObject.getLocal(key); + } + + public function addAddressText(val:String):void{ + settings.data.address_text = val; + } + + public function addressText():String{ + return settings.data.address_text; + } + + static public function instance():SrsSettings{ + return new SrsSettings; + } + } +} \ No newline at end of file diff --git a/trunk/research/players/srs_bwt/src/srs_bwt.as b/trunk/research/players/srs_bwt/src/srs_bwt.as new file mode 100755 index 000000000..ef69cc8c8 --- /dev/null +++ b/trunk/research/players/srs_bwt/src/srs_bwt.as @@ -0,0 +1,227 @@ +package +{ + import SrsClass.SrsElapsedTimer; + + import flash.display.LoaderInfo; + import flash.display.Sprite; + import flash.display.StageAlign; + import flash.display.StageScaleMode; + import flash.events.Event; + import flash.events.NetStatusEvent; + import flash.events.TimerEvent; + import flash.external.ExternalInterface; + import flash.net.NetConnection; + import flash.net.ObjectEncoding; + import flash.system.System; + import flash.ui.ContextMenu; + import flash.ui.ContextMenuItem; + import flash.utils.Timer; + import flash.utils.setTimeout; + + public class srs_bwt extends Sprite + { + private var connection:NetConnection; + + private var updatePlayProgressTimer:Timer; + private var elapTimer:SrsElapsedTimer; + + // server ip get from server + private var server_ip:String; + + // test wheth publish should to stop + private var stop_pub:Boolean = false; + + // js interface + private var js_update_progress:String; + private var js_progress_reset:String; + private var js_update_status:String; + + private var value_progressbar:Number = 0; + private var max_progressbar:Number = 0; + + // set NetConnection ObjectEncoding to AMF0 + NetConnection.defaultObjectEncoding = ObjectEncoding.AMF0; + + public function srs_bwt() + { + this.stage.scaleMode = StageScaleMode.NO_SCALE; + this.stage.align = StageAlign.TOP_LEFT; + + var flashvars:Object = this.root.loaderInfo.parameters; + this.js_update_progress = flashvars.update_progress; + this.js_progress_reset = flashvars.progress_reset; + this.js_update_status = flashvars.update_status; + + // init context menu, add action "Srs 带宽测试工具 0.1" + var myMenu:ContextMenu = new ContextMenu(); + myMenu.hideBuiltInItems(); + myMenu.customItems.push(new ContextMenuItem("Srs 带宽测试工具 0.1", true)); + this.contextMenu = myMenu; + + // init connection + connection = new NetConnection; + connection.client = this; + connection.addEventListener(NetStatusEvent.NET_STATUS, onStatus); + connection.connect(flashvars.url); + //connection.connect("rtmp://192.168.8.234:1935/app?key=35c9b402c12a7246868752e2878f7e0e&vhost=bandcheck.srs.com"); + + // for play to update progress bar + elapTimer = new SrsElapsedTimer; + + // we suppose the check time = 7 S + updatePlayProgressTimer = new Timer(100); + updatePlayProgressTimer.addEventListener(TimerEvent.TIMER, onTimerTimeout); + updatePlayProgressTimer.start(); + } + + // get NetConnection NetStatusEvent + public function onStatus(evt:NetStatusEvent) : void{ + trace(evt.info.code); + switch(evt.info.code){ + case "NetConnection.Connect.Failed": + updateState("连接服务器失败!"); + break; + case "NetConnection.Connect.Rejected": + updateState("服务器拒绝连接!"); + break; + case "NetConnection.Connect.Success": + server_ip = evt.info.data.srs_server_ip; + updateState("连接服务器成功!"); + break; + case "NetConnection.Connect.Closed": + //updateState("连接已断开!"); + break; + } + + } + + /** + * NetConnection callback this function, when recv server call "onSrsBandCheckStartPlayBytes" + * then start @updatePlayProgressTimer for updating the progressbar + * */ + public function onSrsBandCheckStartPlayBytes(evt:Object):void{ + var duration_ms:Number = evt.duration_ms; + var interval_ms:Number = evt.interval_ms; + + connection.call("onSrsBandCheckStartingPlayBytes", null); + updateState("测试下行带宽(" + server_ip + ")"); + + // we suppose play duration_ms = pub duration_ms + max_progressbar = duration_ms * 2; + } + + public function onSrsBandCheckPlaying(evt:Object):void{ + + } + + public function onTimerTimeout(evt:TimerEvent):void + { + value_progressbar = elapTimer.elapsed(); + updateProgess(value_progressbar, max_progressbar); + } + + public function onSrsBandCheckStopPlayBytes(evt:Object):void{ + var duration_ms:Number = evt.duration_ms; + var interval_ms:Number = evt.interval_ms; + var duration_delta:Number = evt.duration_delta; + var bytes_delta:Number = evt.bytes_delta; + + var kbps:Number = 0; + if(duration_delta > 0){ + kbps = bytes_delta * 8.0 / duration_delta; // b/ms == kbps + } + kbps = (int(kbps * 10))/10.0; + + flash.utils.setTimeout(stopPlayTest, 0); + } + + private function stopPlayTest():void{ + connection.call("onSrsBandCheckStoppedPlayBytes", null); + } + + public function onSrsBandCheckStartPublishBytes(evt:Object):void{ + var duration_ms:Number = evt.duration_ms; + var interval_ms:Number = evt.interval_ms; + + connection.call("onSrsBandCheckStartingPublishBytes", null); + updateState("测试上行带宽(" + server_ip + ")"); + + flash.utils.setTimeout(publisher, 0); + } + + private function publisher():void{ + if (stop_pub) { + return; + } + + var data:Array = new Array(); + + var data_size:int = 100; + for(var i:int; i < data_size; i++){ + data.push("SrS band check data from client's publishing......"); + } + data_size += 100; + connection.call("onSrsBandCheckPublishing", null, data); + + flash.utils.setTimeout(publisher, 0); + } + + public function onSrsBandCheckStopPublishBytes(evt:Object):void{ + var duration_ms:Number = evt.duration_ms; + var interval_ms:Number = evt.interval_ms; + var duration_delta:Number = evt.duration_delta; + var bytes_delta:Number = evt.bytes_delta; + + var kbps:Number = 0; + if(duration_delta > 0){ + kbps = bytes_delta * 8.0 / duration_delta; // b/ms == kbps + } + kbps = (int(kbps * 10))/10.0; + + stopPublishTest(); + } + + private function stopPublishTest():void{ + if(connection.connected){ + connection.call("onSrsBandCheckStoppedPublishBytes", null); + } + stop_pub = true; + + value_progressbar = max_progressbar; + updateProgess(value_progressbar, max_progressbar); + updatePlayProgressTimer.stop(); + } + + public function onSrsBandCheckFinished(evt:Object):void{ + var code:Number = evt.code; + var start_time:Number = evt.start_time; + var end_time:Number = evt.end_time; + var play_kbps:Number = evt.play_kbps; + var publish_kbps:Number = evt.publish_kbps; + var play_bytes:Number = evt.play_bytes; + var play_time:Number = evt.play_time; + var publish_bytes:Number = evt.publish_bytes; + var publish_time:Number = evt.publish_time; + + updateState("检测结束: 服务器: " + server_ip + " 上行: " + publish_kbps + " kbps" + " 下行: " + play_kbps + " kbps" + + " 测试时间: " + (end_time-start_time)/1000 + " 秒"); + connection.call("finalClientPacket", null); + } + + public function onBWDone():void{ + // do nothing + } + + // update progressBar's value + private function updateProgess(value:Number, maxValue:Number):void{ + flash.external.ExternalInterface.call(this.js_update_progress, value * 100 / maxValue + "%"); + trace(value + "-" + maxValue + "-" + value * 100 / maxValue + "%"); + } + + // update checking status + private function updateState(text:String):void{ + flash.external.ExternalInterface.call(this.js_update_status, text); + trace(text); + } + } +} \ No newline at end of file diff --git a/trunk/src/main/srs_main_bandcheck.cpp b/trunk/src/main/srs_main_bandcheck.cpp new file mode 100755 index 000000000..7d82f2a21 --- /dev/null +++ b/trunk/src/main/srs_main_bandcheck.cpp @@ -0,0 +1,826 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 wenjiegit + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +// server play control +#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes" +#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes" +#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes" +#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes" + +// server publish control +#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes" +#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes" +#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes" +#define SRS_BW_CHECK_STOPPED_PUBLISH "onSrsBandCheckStoppedPublishBytes" + +// EOF control. +#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished" +#define SRS_BW_CHECK_FLASH_FINAL "finalClientPacket" + +// client only +#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying" +#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing" + +/** +* @brief class of Linux version band check client +* check play and publish speed. +*/ +class SrsBandCheckClient : public SrsRtmpClient +{ +public: + SrsBandCheckClient(st_netfd_t _stfd); + ~SrsBandCheckClient(); + +public: + /** + * @brief test play + * + */ + int check_play(); + /** + * @brief test publish + * + */ + int check_publish(); + +private: + /** + * @brief just return success. + */ + int create_stream(int& stream_id); + /** + * @brief just return success. + */ + int play(std::string stream, int stream_id); + /** + * @brief just return success. + */ + int publish(std::string stream, int stream_id); + +private: + int expect_start_play(); + int send_starting_play(); + int expect_stop_play(); + int send_stopped_play(); + int expect_start_pub(); + int send_starting_pub(); + int send_pub_data(); + int expect_stop_pub(); + /** + * @brief expect result. + * because the core module has no method to decode this packet + * so we must get the internal data and decode it here. + */ + int expect_finished(); + int send_stopped_pub(); + /** + * @brief notify server the check procedure is over. + */ + int send_final(); +}; + +/** +* @brief class of band check +* used to check band width with a client @param bandCheck_Client +*/ +class SrsBandCheck +{ +public: + SrsBandCheck(); + ~SrsBandCheck(); + +public: + /** + * @brief band check method + * + * connect to server------>rtmp handshake------>rtmp connect------>play------>publish + * @retval ERROR_SUCCESS when success. + */ + int check(const std::string& app, const std::string& tcUrl); + + /** + * @brief set the address and port of test server + * + * @param server server address, domain or ip + * @param server listened port ,default is 1935 + */ + void set_server(const std::string& server, int port = 1935); + +private: + int connect_server(); + +private: + SrsBandCheckClient* bandCheck_Client; + std::string server_address; + int server_port; +}; + +/** +* @brief init st lib +*/ +static int init_st(); +static void print_help(); +static void print_version(); + +/** +* @brief get user option +* @internal ip Mandatory arguments +* @internal key Mandatory arguments +* @internal port default 1935 +* @internal vhost default bandcheck.srs.com +*/ +static int get_opt(int argc ,char* argv[]); + +/** +* global var. +*/ +static struct option long_options[] = +{ + {"ip", required_argument, 0, 'i'}, + {"port", optional_argument, 0, 'p'}, + {"key", required_argument, 0, 'k'}, + {"vhost", optional_argument, 0, 'v'}, + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'V'}, +}; + +static const char* short_options = "i:p::k:v::hV"; + +static std::string g_ip; +static int g_port = 1935; +static std::string g_key; +static std::string g_vhost = "bandcheck.srs.com"; + +#define BUILD_VERSION "srs band check 0.1" + +int main(int argc ,char* argv[]) +{ + int ret = ERROR_SUCCESS; + + if ((ret = get_opt(argc, argv)) != ERROR_SUCCESS) { + return -1; + } + + // check param + if (g_ip.empty()) { + printf("ip address should not be empty."); + return -1; + } + + if (g_key.empty()) { + printf("test key should not be empty."); + return -1; + } + + if ((ret = init_st()) != ERROR_SUCCESS) { + srs_error("band check init failed. ret=%d", ret); + return ret; + } + + std::string app = "app?key=" + g_key + "&vhost=" + g_vhost; + + char tcUrl_buffer[1024] = {0}; + sprintf(tcUrl_buffer, "rtmp://%s:%d/%s", g_ip.c_str(), g_port, app.c_str()); + std::string tcUrl = tcUrl_buffer; + + SrsBandCheck band_check; + band_check.set_server(g_ip, g_port); + if ((ret = band_check.check(app, tcUrl)) != ERROR_SUCCESS) { + srs_error("band check failed. address=%s ret=%d", "xx.com", ret); + return -1; + } + + return 0; +} + +SrsBandCheckClient::SrsBandCheckClient(st_netfd_t _stfd) + : SrsRtmpClient(_stfd) +{ +} + +SrsBandCheckClient::~SrsBandCheckClient() +{ +} + +int SrsBandCheckClient::check_play() +{ + int ret = ERROR_SUCCESS; + + if ((ret = expect_start_play()) != ERROR_SUCCESS) { + srs_error("expect_start_play failed. ret=%d", ret); + return ret; + } + + if ((ret = send_starting_play()) != ERROR_SUCCESS) { + srs_error("send starting play failed. ret=%d", ret); + return ret; + } + + if ((ret = expect_stop_play()) != ERROR_SUCCESS) { + srs_error("expect stop play failed. ret=%d", ret); + return ret; + } + + if ((ret = send_stopped_play()) != ERROR_SUCCESS) { + srs_error("send stopped play failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::check_publish() +{ + int ret = ERROR_SUCCESS; + + if ((ret = expect_start_pub()) != ERROR_SUCCESS) { + srs_error("expect start pub failed. ret=%d", ret); + return ret; + } + + if ((ret = send_starting_pub())!= ERROR_SUCCESS) { + srs_error("send starting pub failed. ret=%d", ret); + return ret; + } + + if ((ret = send_pub_data()) != ERROR_SUCCESS) { + srs_error("publish data failed. ret=%d", ret); + return ret; + } + + if ((ret = send_stopped_pub()) != ERROR_SUCCESS) { + srs_error("send stopped pub failed. ret=%d", ret); + return ret; + } + + if ((ret = expect_finished()) != ERROR_SUCCESS) { + srs_error("expect finished msg failed. ret=%d", ret); + return ret; + } + + if ((ret = send_final()) != ERROR_SUCCESS) { + srs_error("send final msg failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::create_stream(int &stream_id) +{ + return ERROR_SUCCESS; +} + +int SrsBandCheckClient::play(std::string stream, int stream_id) +{ + return ERROR_SUCCESS; +} + +int SrsBandCheckClient::publish(std::string stream, int stream_id) +{ + return ERROR_SUCCESS; +} + +int SrsBandCheckClient::expect_start_play() +{ + int ret = ERROR_SUCCESS; + + // expect connect _result + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect bandcheck start play message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck start play message"); + + if (pkt->command_name != SRS_BW_CHECK_START_PLAY) { + srs_error("pkt error. expect=%s, actual=%s", SRS_BW_CHECK_START_PLAY, pkt->command_name.c_str()); + return -1; + } + + return ret; +} + +int SrsBandCheckClient::send_starting_play() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STARTING_PLAY; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send starting play msg failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::expect_stop_play() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect stop play message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck stop play message"); + + if (pkt->command_name == SRS_BW_CHECK_STOP_PLAY) { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::send_stopped_play() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STOPPED_PLAY; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send stopped play msg failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsBandCheckClient::expect_start_pub() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect start pub message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck start pub message"); + + if (pkt->command_name == SRS_BW_CHECK_START_PUBLISH) { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::send_starting_pub() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STARTING_PUBLISH; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send starting play msg failed. ret=%d", ret); + return ret; + } + srs_info("send starting play msg success."); + + return ret; +} + +int SrsBandCheckClient::send_pub_data() +{ + int ret = ERROR_SUCCESS; + + int data_count = 100; + while (true) { + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_PUBLISHING; + msg->set_packet(pkt, 0); + + for (int i = 0; i < data_count; ++i) { + std::stringstream seq; + seq << i; + std::string play_data = "SrS band check data from client's publishing......"; + pkt->data->set(seq.str(), new SrsAmf0String(play_data.c_str())); + } + data_count += 100; + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send publish message failed.ret=%d", ret); + return ret; + } + + if ((ret = expect_stop_pub()) == ERROR_SUCCESS) { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::expect_stop_pub() +{ + int ret = ERROR_SUCCESS; + + while (true) { + if ((ret = st_netfd_poll(get_st_fd(), POLLIN, 1000)) == ERROR_SUCCESS) { + SrsCommonMessage* msg = 0; + if ((ret = recv_message(&msg)) != ERROR_SUCCESS) + { + srs_error("recv message failed while expect stop pub. ret=%d", ret); + return ret; + } + + if ((ret = msg->decode_packet(get_protocol())) != ERROR_SUCCESS) { + srs_error("decode packet error while expect stop pub. ret=%d", ret); + return ret; + } + + SrsBandwidthPacket* pkt = dynamic_cast(msg->get_packet()); + if (pkt && pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) { + + return ret; + } + } else { + break; + } + } + + return ret; +} + +int SrsBandCheckClient::expect_finished() +{ + int ret = ERROR_SUCCESS; + + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(get_protocol(), &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect finished message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandcheck finished message"); + + if (pkt->command_name == SRS_BW_CHECK_FINISHED) { + SrsStream *stream = new SrsStream; + SrsAutoFree(SrsStream, stream, false); + + if ((ret = stream->initialize((char*)msg->payload, msg->size)) != ERROR_SUCCESS) { + srs_error("initialize stream error. ret=%d", ret); + return ret; + } + + std::string command_name; + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amfo read string error. ret=%d", ret); + return ret; + } + + double action_id; + if ((ret = srs_amf0_read_number(stream, action_id)) != ERROR_SUCCESS) { + srs_error("amfo read number error. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amfo read number error. ret=%d", ret); + return ret; + } + + SrsAmf0Object* object; + if ((ret = srs_amf0_read_object(stream, object)) != ERROR_SUCCESS) { + srs_error("amfo read object error. ret=%d", ret); + return ret; + } + + int64_t start_time = 0; + int64_t end_time = 0; + + SrsAmf0Any* start_time_any = object->get_property("start_time"); + if (start_time_any && start_time_any->is_number()) { + SrsAmf0Number* start_time_number = dynamic_cast (start_time_any); + if (start_time_number) { + start_time = start_time_number->value; + } + } + + SrsAmf0Any* end_time_any = object->get_property("end_time"); + if (end_time_any && end_time_any->is_number()) { + SrsAmf0Number* end_time_number = dynamic_cast (end_time_any); + if (end_time_number) { + end_time = end_time_number->value; + } + } + + int play_kbps = 0; + int pub_kbps = 0; + SrsAmf0Any* play_kbp_any = object->get_property("play_kbps"); + if (play_kbp_any && play_kbp_any->is_number()) { + SrsAmf0Number* play_kbps_number = dynamic_cast (play_kbp_any); + if (play_kbps_number) { + play_kbps = play_kbps_number->value; + } + } + + SrsAmf0Any* pub_kbp_any = object->get_property("publish_kbps"); + if (pub_kbp_any && pub_kbp_any->is_number()) { + SrsAmf0Number* pub_kbps_number = dynamic_cast (pub_kbp_any); + if (pub_kbps_number) { + pub_kbps = pub_kbps_number->value; + } + } + + float time_elapsed; + if (end_time - start_time > 0) { + time_elapsed = (end_time - start_time) / 1000.00; + } + + srs_trace("result: play %d kbps, publish %d kbps, check time %.4f S\n" + , play_kbps, pub_kbps, time_elapsed); + + break; + } + } + + return ret; +} + +int SrsBandCheckClient::send_stopped_pub() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_STOPPED_PUBLISH; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send stopped pub msg failed. ret=%d", ret); + return ret; + } + srs_info("send stopped pub msg success."); + + return ret; +} + +int SrsBandCheckClient::send_final() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage; + SrsBandwidthPacket* pkt = new SrsBandwidthPacket; + pkt->command_name = SRS_BW_CHECK_FLASH_FINAL; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send final msg failed. ret=%d", ret); + return ret; + } + srs_info("send final msg success."); + + return ret; +} + +SrsBandCheck::SrsBandCheck() + : bandCheck_Client(0) +{ +} + +SrsBandCheck::~SrsBandCheck() +{ + if (bandCheck_Client) { + srs_freep(bandCheck_Client); + } +} + +int SrsBandCheck::check(const std::string &app, const std::string &tcUrl) +{ + int ret = ERROR_SUCCESS; + + if ((ret = connect_server()) != ERROR_SUCCESS) { + srs_error("connect to server failed. ret = %d", ret); + return ret; + } + + if ((ret = bandCheck_Client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake failed. ret = %d", ret); + return ret; + } + + if ((ret = bandCheck_Client->connect_app(app, tcUrl)) != ERROR_SUCCESS) { + srs_error("handshake failed. ret = %d", ret); + return ret; + } + + if ((ret = bandCheck_Client->check_play()) != ERROR_SUCCESS) { + srs_error("band check play failed."); + return ret; + } + + if ((ret = bandCheck_Client->check_publish()) != ERROR_SUCCESS) { + srs_error("band check publish failed."); + return ret; + } + + return ret; +} + +void SrsBandCheck::set_server(const std::string &server, int port) +{ + server_address = server; + server_port = port; +} + +int SrsBandCheck::connect_server() +{ + int ret = ERROR_SUCCESS; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + st_netfd_t stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + bandCheck_Client = new SrsBandCheckClient(stfd); + + // connect to server. + std::string ip = srs_dns_resolve(server_address); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(server_port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), server_port, ret); + return ret; + } + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server_address.c_str(), ip.c_str(), server_port); + + return ret; +} + +int init_st() +{ + int ret = ERROR_SUCCESS; + + if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { + ret = ERROR_ST_SET_EPOLL; + srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret); + return ret; + } + + if(st_init() != 0){ + ret = ERROR_ST_INITIALIZE; + srs_error("st_init failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void print_help() +{ + const char *help = "Usage: srs-bandcheck [OPTION]...\n" + "test band width from client to rtmp server.\n" + + "Mandatory arguments to long options are mandatory for short options too.\n" + " -i, --ip the ip or domain that to test\n" + " -p, --port the port that server listen \n" + " -k, --key the key used to test \n" + " -v, --vhost the vhost used to test \n" + " -V, --version output version information and exit \n" + " -h, --help display this help and exit \n" + "\n\n\n" + "Exit status:\n" + "0 if OK,\n" + "other if error occured, and the detail should be printed.\n" + "\n\n" + "srs home page: \n" + "srs home page: \n"; + + printf("%s", help); +} + +void print_version() +{ + const char *version = "" + "srs_bandcheck "BUILD_VERSION"\n" + "Copyright (C) 2013 wenjiegit.\n" + "License MIT\n" + "This is free software: you are free to change and redistribute it.\n" + "There is NO WARRANTY, to the extent permitted by law.\n" + "\n" + "Written by wenjie.\n"; + + printf("%s", version); +} + +int get_opt(int argc, char *argv[]) +{ + int ret = ERROR_SUCCESS; + + int c; + while ((c = getopt_long (argc, argv, short_options, long_options, NULL)) != -1) { + switch (c) { + case 'i': + if (optarg) { + g_ip = optarg; + } + break; + case 'p': + if (optarg) { + g_port = atoi(optarg); + } + break; + case 'k': + if (optarg) { + g_key = optarg; + } + break; + case 'v': + if (optarg) { + g_vhost = optarg; + } + break; + case 'V': + print_version(); + exit(0); + break; + case 'h': + print_help(); + exit(0); + break; + default: + printf("see --help or -h\n"); + ret = -1; + } + } + + return ret; +}