1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

merge from wenjie, the bandwidth test feature.

This commit is contained in:
winlin 2013-12-27 17:03:12 +08:00
parent 6bba081485
commit 20d1732ced
16 changed files with 1577 additions and 246 deletions

16
trunk/src/core/srs_core_bandwidth.cpp Normal file → Executable file
View file

@ -24,6 +24,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_bandwidth.hpp>
#include <arpa/inet.h>
#include <sstream>
using namespace std;
@ -260,21 +261,24 @@ int SrsBandwidth::check_play(
int64_t current_time = srs_get_system_time_ms();
int size = 1024; // TODO: FIXME: magic number
char random_data[size];
memset(random_data, 0x01, size);
memset(random_data, 'A', size);
int interval = 0;
int data_count = 1;
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(interval);
// TODO: FIXME: use shared ptr message.
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_playing();
// TODO: FIXME: magic number
for (int i = 0; i < 100; ++i) {
char buf[32]; // TODO: FIXME: magic number
sprintf(buf, "%d", i);
pkt->data->set(buf, new SrsAmf0String(random_data));
// TODO: FIXME: magic number
for (int i = 0; i < data_count; ++i) {
std::stringstream seq;
seq << i;
std::string play_data = "SrS band check data from server's playing......";
pkt->data->set(seq.str(), new SrsAmf0String(play_data.c_str()));
}
data_count += 2;
// TODO: FIXME: get length from the rtmp protocol stack.
play_bytes += pkt->get_payload_length();

2
trunk/src/core/srs_core_protocol.cpp Normal file → Executable file
View file

@ -2789,7 +2789,7 @@ SrsBandwidthPacket* SrsBandwidthPacket::create_start_play()
SrsBandwidthPacket* SrsBandwidthPacket::create_playing()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_STARTING_PLAY);
return pkt->set_command(SRS_BW_CHECK_PLAYING);
}
SrsBandwidthPacket* SrsBandwidthPacket::create_stop_play()

460
trunk/src/core/srs_core_rtmp.hpp Normal file → Executable file
View file

@ -1,231 +1,231 @@
/*
The MIT License (MIT)
Copyright (c) 2013 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_CORE_RTMP_HPP
#define SRS_CORE_RTMP_HPP
/*
#include <srs_core_rtmp.hpp>
*/
#include <srs_core.hpp>
#include <string>
class SrsProtocol;
class ISrsMessage;
class SrsCommonMessage;
class SrsCreateStreamPacket;
class SrsFMLEStartPacket;
class SrsPublishPacket;
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
/**
* the original request from client.
*/
struct SrsRequest
{
/**
* tcUrl: rtmp://request_vhost:port/app/stream
* support pass vhost in query string, such as:
* rtmp://ip:port/app?vhost=request_vhost/stream
* rtmp://ip:port/app...vhost...request_vhost/stream
*/
std::string tcUrl;
std::string pageUrl;
std::string swfUrl;
double objectEncoding;
std::string schema;
std::string vhost;
std::string port;
std::string app;
std::string stream;
SrsRequest();
virtual ~SrsRequest();
/**
* deep copy the request, for source to use it to support reload,
* for when initialize the source, the request is valid,
* when reload it, the request maybe invalid, so need to copy it.
*/
virtual SrsRequest* copy();
/**
* disconvery vhost/app from tcUrl.
*/
virtual int discovery_app();
virtual std::string get_stream_url();
virtual void strip();
private:
std::string& trim(std::string& str, std::string chs);
};
/**
* the response to client.
*/
struct SrsResponse
{
int stream_id;
SrsResponse();
virtual ~SrsResponse();
};
/**
* the rtmp client type.
*/
enum SrsClientType
{
SrsClientUnknown,
SrsClientPlay,
SrsClientFMLEPublish,
SrsClientFlashPublish,
};
/**
* implements the client role protocol.
*/
class SrsRtmpClient
{
private:
SrsProtocol* protocol;
st_netfd_t stfd;
public:
SrsRtmpClient(st_netfd_t _stfd);
virtual ~SrsRtmpClient();
public:
virtual void set_recv_timeout(int64_t timeout_us);
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
public:
virtual int handshake();
virtual int connect_app(std::string app, std::string tc_url);
virtual int create_stream(int& stream_id);
virtual int play(std::string stream, int stream_id);
virtual int publish(std::string stream, int stream_id);
};
/**
* the rtmp provices rtmp-command-protocol services,
* a high level protocol, media stream oriented services,
* such as connect to vhost/app, play stream, get audio/video data.
*/
class SrsRtmp
{
private:
SrsProtocol* protocol;
st_netfd_t stfd;
public:
SrsRtmp(st_netfd_t client_stfd);
virtual ~SrsRtmp();
public:
virtual SrsProtocol* get_protocol();
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
public:
virtual int handshake();
virtual int connect_app(SrsRequest* req);
virtual int set_window_ack_size(int ack_size);
/**
* @type: The sender can mark this message hard (0), soft (1), or dynamic (2)
* using the Limit type field.
*/
virtual int set_peer_bandwidth(int bandwidth, int type);
/**
* @param server_ip the ip of server.
*/
virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL);
virtual void response_connect_reject(SrsRequest* req, const char* desc);
virtual int on_bw_done();
/**
* recv some message to identify the client.
* @stream_id, client will createStream to play or publish by flash,
* the stream_id used to response the createStream request.
* @type, output the client type.
*/
virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name);
/**
* set the chunk size when client type identified.
*/
virtual int set_chunk_size(int chunk_size);
/**
* when client type is play, response with packets:
* StreamBegin,
* onStatus(NetStream.Play.Reset), onStatus(NetStream.Play.Start).,
* |RtmpSampleAccess(false, false),
* onStatus(NetStream.Data.Start).
*/
virtual int start_play(int stream_id);
/**
* when client(type is play) send pause message,
* if is_pause, response the following packets:
* onStatus(NetStream.Pause.Notify)
* StreamEOF
* if not is_pause, response the following packets:
* onStatus(NetStream.Unpause.Notify)
* StreamBegin
*/
virtual int on_play_client_pause(int stream_id, bool is_pause);
/**
* when client type is publish, response with packets:
* releaseStream response
* FCPublish
* FCPublish response
* createStream response
* onFCPublish(NetStream.Publish.Start)
* onStatus(NetStream.Publish.Start)
*/
virtual int start_fmle_publish(int stream_id);
/**
* process the FMLE unpublish event.
* @unpublish_tid the unpublish request transaction id.
*/
virtual int fmle_unpublish(int stream_id, double unpublish_tid);
/**
* when client type is publish, response with packets:
* onStatus(NetStream.Publish.Start)
*/
virtual int start_flash_publish(int stream_id);
private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name);
virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name);
};
/*
The MIT License (MIT)
Copyright (c) 2013 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_CORE_RTMP_HPP
#define SRS_CORE_RTMP_HPP
/*
#include <srs_core_rtmp.hpp>
*/
#include <srs_core.hpp>
#include <string>
class SrsProtocol;
class ISrsMessage;
class SrsCommonMessage;
class SrsCreateStreamPacket;
class SrsFMLEStartPacket;
class SrsPublishPacket;
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
/**
* the original request from client.
*/
struct SrsRequest
{
/**
* tcUrl: rtmp://request_vhost:port/app/stream
* support pass vhost in query string, such as:
* rtmp://ip:port/app?vhost=request_vhost/stream
* rtmp://ip:port/app...vhost...request_vhost/stream
*/
std::string tcUrl;
std::string pageUrl;
std::string swfUrl;
double objectEncoding;
std::string schema;
std::string vhost;
std::string port;
std::string app;
std::string stream;
SrsRequest();
virtual ~SrsRequest();
/**
* deep copy the request, for source to use it to support reload,
* for when initialize the source, the request is valid,
* when reload it, the request maybe invalid, so need to copy it.
*/
virtual SrsRequest* copy();
/**
* disconvery vhost/app from tcUrl.
*/
virtual int discovery_app();
virtual std::string get_stream_url();
virtual void strip();
private:
std::string& trim(std::string& str, std::string chs);
};
/**
* the response to client.
*/
struct SrsResponse
{
int stream_id;
SrsResponse();
virtual ~SrsResponse();
};
/**
* the rtmp client type.
*/
enum SrsClientType
{
SrsClientUnknown,
SrsClientPlay,
SrsClientFMLEPublish,
SrsClientFlashPublish,
};
/**
* implements the client role protocol.
*/
class SrsRtmpClient
{
protected:
SrsProtocol* protocol;
st_netfd_t stfd;
public:
SrsRtmpClient(st_netfd_t _stfd);
virtual ~SrsRtmpClient();
public:
virtual void set_recv_timeout(int64_t timeout_us);
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
public:
virtual int handshake();
virtual int connect_app(std::string app, std::string tc_url);
virtual int create_stream(int& stream_id);
virtual int play(std::string stream, int stream_id);
virtual int publish(std::string stream, int stream_id);
};
/**
* the rtmp provices rtmp-command-protocol services,
* a high level protocol, media stream oriented services,
* such as connect to vhost/app, play stream, get audio/video data.
*/
class SrsRtmp
{
private:
SrsProtocol* protocol;
st_netfd_t stfd;
public:
SrsRtmp(st_netfd_t client_stfd);
virtual ~SrsRtmp();
public:
virtual SrsProtocol* get_protocol();
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
public:
virtual int handshake();
virtual int connect_app(SrsRequest* req);
virtual int set_window_ack_size(int ack_size);
/**
* @type: The sender can mark this message hard (0), soft (1), or dynamic (2)
* using the Limit type field.
*/
virtual int set_peer_bandwidth(int bandwidth, int type);
/**
* @param server_ip the ip of server.
*/
virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL);
virtual void response_connect_reject(SrsRequest* req, const char* desc);
virtual int on_bw_done();
/**
* recv some message to identify the client.
* @stream_id, client will createStream to play or publish by flash,
* the stream_id used to response the createStream request.
* @type, output the client type.
*/
virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name);
/**
* set the chunk size when client type identified.
*/
virtual int set_chunk_size(int chunk_size);
/**
* when client type is play, response with packets:
* StreamBegin,
* onStatus(NetStream.Play.Reset), onStatus(NetStream.Play.Start).,
* |RtmpSampleAccess(false, false),
* onStatus(NetStream.Data.Start).
*/
virtual int start_play(int stream_id);
/**
* when client(type is play) send pause message,
* if is_pause, response the following packets:
* onStatus(NetStream.Pause.Notify)
* StreamEOF
* if not is_pause, response the following packets:
* onStatus(NetStream.Unpause.Notify)
* StreamBegin
*/
virtual int on_play_client_pause(int stream_id, bool is_pause);
/**
* when client type is publish, response with packets:
* releaseStream response
* FCPublish
* FCPublish response
* createStream response
* onFCPublish(NetStream.Publish.Start)
* onStatus(NetStream.Publish.Start)
*/
virtual int start_fmle_publish(int stream_id);
/**
* process the FMLE unpublish event.
* @unpublish_tid the unpublish request transaction id.
*/
virtual int fmle_unpublish(int stream_id, double unpublish_tid);
/**
* when client type is publish, response with packets:
* onStatus(NetStream.Publish.Start)
*/
virtual int start_flash_publish(int stream_id);
private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name);
virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name);
};
#endif

View file

@ -0,0 +1,832 @@
/*
The MIT License (MIT)
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sstream>
#include <getopt.h>
#include <stdlib.h>
#include <srs_core_rtmp.hpp>
#include <srs_core_protocol.hpp>
#include <srs_core_error.hpp>
#include <srs_core_amf0.hpp>
#include <srs_core_autofree.hpp>
#include <srs_core_stream.hpp>
#include <st.h>
// server play control
#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes"
#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes"
#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes"
#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes"
// server publish control
#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes"
#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes"
#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes"
#define SRS_BW_CHECK_STOPPED_PUBLISH "onSrsBandCheckStoppedPublishBytes"
// EOF control.
#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished"
#define SRS_BW_CHECK_FLASH_FINAL "finalClientPacket"
// client only
#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying"
#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing"
/**
* @brief class of Linux version band check client
* check play and publish speed.
*/
class SrsBandCheckClient : public SrsRtmpClient
{
public:
SrsBandCheckClient(st_netfd_t _stfd);
~SrsBandCheckClient();
public:
/**
* @brief test play
*
*/
int check_play();
/**
* @brief test publish
*
*/
int check_publish();
private:
/**
* @brief just return success.
*/
int create_stream(int& stream_id);
/**
* @brief just return success.
*/
int play(std::string stream, int stream_id);
/**
* @brief just return success.
*/
int publish(std::string stream, int stream_id);
private:
int expect_start_play();
int send_starting_play();
int expect_stop_play();
int send_stopped_play();
int expect_start_pub();
int send_starting_pub();
int send_pub_data();
int expect_stop_pub();
/**
* @brief expect result.
* because the core module has no method to decode this packet
* so we must get the internal data and decode it here.
*/
int expect_finished();
int send_stopped_pub();
/**
* @brief notify server the check procedure is over.
*/
int send_final();
};
/**
* @brief class of band check
* used to check band width with a client @param bandCheck_Client
*/
class SrsBandCheck
{
public:
SrsBandCheck();
~SrsBandCheck();
public:
/**
* @brief band check method
*
* connect to server------>rtmp handshake------>rtmp connect------>play------>publish
* @retval ERROR_SUCCESS when success.
*/
int check(const std::string& app, const std::string& tcUrl);
/**
* @brief set the address and port of test server
*
* @param server server address, domain or ip
* @param server listened port ,default is 1935
*/
void set_server(const std::string& server, int port = 1935);
private:
int connect_server();
private:
SrsBandCheckClient* bandCheck_Client;
std::string server_address;
int server_port;
};
/**
* @brief init st lib
*/
static int init_st();
static void print_help(char** argv);
static void print_version();
/**
* @brief get user option
* @internal ip Mandatory arguments
* @internal key Mandatory arguments
* @internal port default 1935
* @internal vhost default bandcheck.srs.com
*/
static int get_opt(int argc ,char* argv[]);
/**
* global var.
*/
static struct option long_options[] =
{
{"ip", required_argument, 0, 'i'},
{"port", optional_argument, 0, 'p'},
{"key", required_argument, 0, 'k'},
{"vhost", optional_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{"version", no_argument, 0, 'V'},
};
static const char* short_options = "i:p::k:v::hV";
static std::string g_ip;
static int g_port = 1935;
static std::string g_key;
static std::string g_vhost = "bandcheck.srs.com";
#define BUILD_VERSION "srs band check 0.1"
int main(int argc ,char* argv[])
{
int ret = ERROR_SUCCESS;
if (argc <= 1) {
print_help(argv);
exit(1);
}
if ((ret = get_opt(argc, argv)) != ERROR_SUCCESS) {
return -1;
}
// check param
if (g_ip.empty()) {
printf("ip address should not be empty.\n");
return -1;
}
if (g_key.empty()) {
printf("test key should not be empty.\n");
return -1;
}
if ((ret = init_st()) != ERROR_SUCCESS) {
srs_error("band check init failed. ret=%d", ret);
return ret;
}
std::string app = "app?key=" + g_key + "&vhost=" + g_vhost;
char tcUrl_buffer[1024] = {0};
sprintf(tcUrl_buffer, "rtmp://%s:%d/%s", g_ip.c_str(), g_port, app.c_str());
std::string tcUrl = tcUrl_buffer;
SrsBandCheck band_check;
band_check.set_server(g_ip, g_port);
if ((ret = band_check.check(app, tcUrl)) != ERROR_SUCCESS) {
srs_error("band check failed. address=%s ret=%d", "xx.com", ret);
return -1;
}
return 0;
}
SrsBandCheckClient::SrsBandCheckClient(st_netfd_t _stfd)
: SrsRtmpClient(_stfd)
{
}
SrsBandCheckClient::~SrsBandCheckClient()
{
}
int SrsBandCheckClient::check_play()
{
int ret = ERROR_SUCCESS;
if ((ret = expect_start_play()) != ERROR_SUCCESS) {
srs_error("expect_start_play failed. ret=%d", ret);
return ret;
}
if ((ret = send_starting_play()) != ERROR_SUCCESS) {
srs_error("send starting play failed. ret=%d", ret);
return ret;
}
if ((ret = expect_stop_play()) != ERROR_SUCCESS) {
srs_error("expect stop play failed. ret=%d", ret);
return ret;
}
if ((ret = send_stopped_play()) != ERROR_SUCCESS) {
srs_error("send stopped play failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsBandCheckClient::check_publish()
{
int ret = ERROR_SUCCESS;
if ((ret = expect_start_pub()) != ERROR_SUCCESS) {
srs_error("expect start pub failed. ret=%d", ret);
return ret;
}
if ((ret = send_starting_pub())!= ERROR_SUCCESS) {
srs_error("send starting pub failed. ret=%d", ret);
return ret;
}
if ((ret = send_pub_data()) != ERROR_SUCCESS) {
srs_error("publish data failed. ret=%d", ret);
return ret;
}
if ((ret = send_stopped_pub()) != ERROR_SUCCESS) {
srs_error("send stopped pub failed. ret=%d", ret);
return ret;
}
if ((ret = expect_finished()) != ERROR_SUCCESS) {
srs_error("expect finished msg failed. ret=%d", ret);
return ret;
}
if ((ret = send_final()) != ERROR_SUCCESS) {
srs_error("send final msg failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsBandCheckClient::create_stream(int &stream_id)
{
return ERROR_SUCCESS;
}
int SrsBandCheckClient::play(std::string stream, int stream_id)
{
return ERROR_SUCCESS;
}
int SrsBandCheckClient::publish(std::string stream, int stream_id)
{
return ERROR_SUCCESS;
}
int SrsBandCheckClient::expect_start_play()
{
int ret = ERROR_SUCCESS;
// expect connect _result
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandcheck start play message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandcheck start play message");
if (pkt->command_name != SRS_BW_CHECK_START_PLAY) {
srs_error("pkt error. expect=%s, actual=%s", SRS_BW_CHECK_START_PLAY, pkt->command_name.c_str());
return -1;
}
return ret;
}
int SrsBandCheckClient::send_starting_play()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STARTING_PLAY;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
srs_error("send starting play msg failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsBandCheckClient::expect_stop_play()
{
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect stop play message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandcheck stop play message");
if (pkt->command_name == SRS_BW_CHECK_STOP_PLAY) {
break;
}
}
return ret;
}
int SrsBandCheckClient::send_stopped_play()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STOPPED_PLAY;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
srs_error("send stopped play msg failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsBandCheckClient::expect_start_pub()
{
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect start pub message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandcheck start pub message");
if (pkt->command_name == SRS_BW_CHECK_START_PUBLISH) {
break;
}
}
return ret;
}
int SrsBandCheckClient::send_starting_pub()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STARTING_PUBLISH;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
srs_error("send starting play msg failed. ret=%d", ret);
return ret;
}
srs_info("send starting play msg success.");
return ret;
}
int SrsBandCheckClient::send_pub_data()
{
int ret = ERROR_SUCCESS;
int data_count = 100;
while (true) {
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_PUBLISHING;
msg->set_packet(pkt, 0);
for (int i = 0; i < data_count; ++i) {
std::stringstream seq;
seq << i;
std::string play_data = "SrS band check data from client's publishing......";
pkt->data->set(seq.str(), new SrsAmf0String(play_data.c_str()));
}
data_count += 100;
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
srs_error("send publish message failed.ret=%d", ret);
return ret;
}
if ((ret = expect_stop_pub()) == ERROR_SUCCESS) {
break;
}
}
return ret;
}
int SrsBandCheckClient::expect_stop_pub()
{
int ret = ERROR_SUCCESS;
while (true) {
if ((ret = st_netfd_poll(stfd, POLLIN, 1000)) == ERROR_SUCCESS) {
SrsCommonMessage* msg = 0;
if ((ret = recv_message(&msg)) != ERROR_SUCCESS)
{
srs_error("recv message failed while expect stop pub. ret=%d", ret);
return ret;
}
if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
srs_error("decode packet error while expect stop pub. ret=%d", ret);
return ret;
}
SrsBandwidthPacket* pkt = dynamic_cast<SrsBandwidthPacket*>(msg->get_packet());
if (pkt && pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) {
return ret;
}
} else {
break;
}
}
return ret;
}
int SrsBandCheckClient::expect_finished()
{
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect finished message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandcheck finished message");
if (pkt->command_name == SRS_BW_CHECK_FINISHED) {
SrsStream *stream = new SrsStream;
SrsAutoFree(SrsStream, stream, false);
if ((ret = stream->initialize((char*)msg->payload, msg->size)) != ERROR_SUCCESS) {
srs_error("initialize stream error. ret=%d", ret);
return ret;
}
std::string command_name;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amfo read string error. ret=%d", ret);
return ret;
}
double action_id;
if ((ret = srs_amf0_read_number(stream, action_id)) != ERROR_SUCCESS) {
srs_error("amfo read number error. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amfo read number error. ret=%d", ret);
return ret;
}
SrsAmf0Object* object;
if ((ret = srs_amf0_read_object(stream, object)) != ERROR_SUCCESS) {
srs_error("amfo read object error. ret=%d", ret);
return ret;
}
int64_t start_time = 0;
int64_t end_time = 0;
SrsAmf0Any* start_time_any = object->get_property("start_time");
if (start_time_any && start_time_any->is_number()) {
SrsAmf0Number* start_time_number = dynamic_cast<SrsAmf0Number*> (start_time_any);
if (start_time_number) {
start_time = start_time_number->value;
}
}
SrsAmf0Any* end_time_any = object->get_property("end_time");
if (end_time_any && end_time_any->is_number()) {
SrsAmf0Number* end_time_number = dynamic_cast<SrsAmf0Number*> (end_time_any);
if (end_time_number) {
end_time = end_time_number->value;
}
}
int play_kbps = 0;
int pub_kbps = 0;
SrsAmf0Any* play_kbp_any = object->get_property("play_kbps");
if (play_kbp_any && play_kbp_any->is_number()) {
SrsAmf0Number* play_kbps_number = dynamic_cast<SrsAmf0Number*> (play_kbp_any);
if (play_kbps_number) {
play_kbps = play_kbps_number->value;
}
}
SrsAmf0Any* pub_kbp_any = object->get_property("publish_kbps");
if (pub_kbp_any && pub_kbp_any->is_number()) {
SrsAmf0Number* pub_kbps_number = dynamic_cast<SrsAmf0Number*> (pub_kbp_any);
if (pub_kbps_number) {
pub_kbps = pub_kbps_number->value;
}
}
float time_elapsed;
if (end_time - start_time > 0) {
time_elapsed = (end_time - start_time) / 1000.00;
}
srs_trace("result: play %d kbps, publish %d kbps, check time %.4f S\n"
, play_kbps, pub_kbps, time_elapsed);
break;
}
}
return ret;
}
int SrsBandCheckClient::send_stopped_pub()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STOPPED_PUBLISH;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
srs_error("send stopped pub msg failed. ret=%d", ret);
return ret;
}
srs_info("send stopped pub msg success.");
return ret;
}
int SrsBandCheckClient::send_final()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_FLASH_FINAL;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
srs_error("send final msg failed. ret=%d", ret);
return ret;
}
srs_info("send final msg success.");
return ret;
}
SrsBandCheck::SrsBandCheck()
: bandCheck_Client(0)
{
}
SrsBandCheck::~SrsBandCheck()
{
if (bandCheck_Client) {
srs_freep(bandCheck_Client);
}
}
int SrsBandCheck::check(const std::string &app, const std::string &tcUrl)
{
int ret = ERROR_SUCCESS;
if ((ret = connect_server()) != ERROR_SUCCESS) {
srs_error("connect to server failed. ret = %d", ret);
return ret;
}
if ((ret = bandCheck_Client->handshake()) != ERROR_SUCCESS) {
srs_error("handshake failed. ret = %d", ret);
return ret;
}
if ((ret = bandCheck_Client->connect_app(app, tcUrl)) != ERROR_SUCCESS) {
srs_error("handshake failed. ret = %d", ret);
return ret;
}
if ((ret = bandCheck_Client->check_play()) != ERROR_SUCCESS) {
srs_error("band check play failed.");
return ret;
}
if ((ret = bandCheck_Client->check_publish()) != ERROR_SUCCESS) {
srs_error("band check publish failed.");
return ret;
}
return ret;
}
void SrsBandCheck::set_server(const std::string &server, int port)
{
server_address = server;
server_port = port;
}
int SrsBandCheck::connect_server()
{
int ret = ERROR_SUCCESS;
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
st_netfd_t stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
bandCheck_Client = new SrsBandCheckClient(stfd);
// connect to server.
std::string ip = srs_dns_resolve(server_address);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
return ret;
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), server_port, ret);
return ret;
}
srs_trace("connect to server success. server=%s, ip=%s, port=%d", server_address.c_str(), ip.c_str(), server_port);
return ret;
}
int init_st()
{
int ret = ERROR_SUCCESS;
if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
ret = ERROR_ST_SET_EPOLL;
srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret);
return ret;
}
if(st_init() != 0){
ret = ERROR_ST_INITIALIZE;
srs_error("st_init failed. ret=%d", ret);
return ret;
}
return ret;
}
void print_help(char** argv)
{
printf(
"Usage: %s [OPTION]...\n"
"test band width from client to rtmp server.\n"
"Mandatory arguments to long options are mandatory for short options too.\n"
" -i, --ip the ip or domain that to test\n"
" -p, --port the port that server listen \n"
" -k, --key the key used to test \n"
" -v, --vhost the vhost used to test \n"
" -V, --version output version information and exit \n"
" -h, --help display this help and exit \n"
"\n"
"For example:\n"
" %s -i 192.168.1.248 -p 1935 -v bandcheck.srs.com -k 35c9b402c12a7246868752e2878f7e0e"
"\n\n"
"Exit status:\n"
"0 if OK,\n"
"other if error occured, and the detail should be printed.\n"
"\n\n"
"srs home page: <http://blog.chinaunix.net/uid/25006789.html>\n",
argv[0], argv[0]);
}
void print_version()
{
const char *version = ""
"srs_bandcheck "BUILD_VERSION"\n"
"Copyright (C) 2013 wenjiegit.\n"
"License MIT\n"
"This is free software: you are free to change and redistribute it.\n"
"There is NO WARRANTY, to the extent permitted by law.\n"
"\n"
"Written by wenjie.\n";
printf("%s", version);
}
int get_opt(int argc, char *argv[])
{
int ret = ERROR_SUCCESS;
int c;
while ((c = getopt_long (argc, argv, short_options, long_options, NULL)) != -1) {
switch (c) {
case 'i':
if (optarg) {
g_ip = optarg;
}
break;
case 'p':
if (optarg) {
g_port = atoi(optarg);
}
break;
case 'k':
if (optarg) {
g_key = optarg;
}
break;
case 'v':
if (optarg) {
g_vhost = optarg;
}
break;
case 'V':
print_version();
exit(0);
break;
case 'h':
print_help(argv);
exit(0);
break;
default:
printf("see --help or -h\n");
ret = -1;
}
}
return ret;
}

View file

@ -1,6 +1,7 @@
file
main readonly separator,
..\main\srs_main_server.cpp,
..\main\srs_main_bandcheck.cpp,
auto readonly separator,
..\..\objs\srs_auto_headers.hpp,
core readonly separator,