1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00

merge upstream

This commit is contained in:
wenjiegit 2013-12-24 10:59:38 +08:00
parent 00fb37a831
commit c7cd726f1b
50 changed files with 40 additions and 496 deletions

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -34,7 +33,7 @@ static int64_t _srs_system_time_us_cache = 0;
int64_t srs_get_system_time_ms()
{
return _srs_system_time_us_cache / 1000;
return _srs_system_time_us_cache / 1000;
}
void srs_update_system_time_ms()
@ -104,13 +103,13 @@ void srs_vhost_resolve(std::string& vhost, std::string& app)
if ((pos = query.find("vhost?")) != std::string::npos
|| (pos = query.find("vhost=")) != std::string::npos
|| (pos = query.find("Vhost?")) != std::string::npos
|| (pos = query.find("Vhost=")) != std::string::npos)
{
|| (pos = query.find("Vhost=")) != std::string::npos
) {
query = query.substr(pos + 6);
if (!query.empty()) {
vhost = query;
}
}
}
}
void srs_close_stfd(st_netfd_t& stfd)

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

0
trunk/src/core/srs_core_amf0.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_amf0.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_autofree.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_autofree.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_bandwidth.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_bandwidth.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_buffer.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_buffer.hpp Normal file → Executable file
View file

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -145,7 +144,7 @@ int SrsClient::service_cycle()
{
int ret = ERROR_SUCCESS;
if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) {
if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) {
srs_error("set window acknowledgement size failed. ret=%d", ret);
return ret;
}
@ -166,7 +165,7 @@ int SrsClient::service_cycle()
srs_error("response connect app failed. ret=%d", ret);
return ret;
}
srs_verbose("response connect app success");
srs_verbose("response connect app success");
if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) {
srs_error("on_bw_done failed. ret=%d", ret);
@ -174,12 +173,11 @@ int SrsClient::service_cycle()
}
srs_verbose("on_bw_done success");
SrsClientType type;
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) {
srs_error("identify client failed. ret=%d", ret);
return ret;
}
SrsClientType type;
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) {
srs_error("identify client failed. ret=%d", ret);
return ret;
}
req->strip();
srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
@ -541,40 +539,6 @@ int SrsClient::get_peer_ip()
return ret;
}
int SrsClient::get_local_ip(char *&local_ip)
{
int ret = ERROR_SUCCESS;
int fd = st_netfd_fileno(stfd);
// discovery client information
sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
if (getsockname(fd, (sockaddr*)&addr, &addrlen) == -1) {
ret = ERROR_SOCKET_GET_LOCAL_IP;
srs_error("discovery local ip information failed. ret=%d", ret);
return ret;
}
srs_verbose("get local ip success.");
// ip v4 or v6
char buf[INET6_ADDRSTRLEN];
memset(buf, 0, sizeof(buf));
if ((inet_ntop(addr.sin_family, &addr.sin_addr, buf, sizeof(buf))) == NULL) {
ret = ERROR_SOCKET_GET_LOCAL_IP;
srs_error("convert local ip information failed. ret=%d", ret);
return ret;
}
local_ip = new char[strlen(buf) + 1];
strcpy(local_ip, buf);
srs_verbose("get local ip of client ip=%s, fd=%d", buf, fd);
return ret;
}
int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -77,7 +76,6 @@ private:
virtual int publish(SrsSource* source, bool is_fmle);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle);
virtual int get_peer_ip();
virtual int get_local_ip(char *&local_ip);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
private:
virtual int on_connect();

0
trunk/src/core/srs_core_codec.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_codec.hpp Normal file → Executable file
View file

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -1508,61 +1507,7 @@ int SrsConfig::get_pithy_print_hls()
return SRS_STAGE_HLS_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
bool SrsConfig::get_bw_check_enabled(const std::string &vhost, const std::string &key)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return false;
}
SrsConfDirective* bw_test = conf->get("bandcheck");
if(!bw_test)
return false;
SrsConfDirective* bw_enabled_conf = bw_test->get("enabled");
if(bw_enabled_conf && bw_enabled_conf->arg0() == "on"){
SrsConfDirective* bw_key = bw_test->get("key");
if(!bw_key) return false;
std::vector<std::string> &args = bw_key->args;
for(unsigned int i = 0; i < args.size(); ++i){
if(args.at(i) == key)
return true;
}
}
return false;
}
void SrsConfig::get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &limit_kbps)
{
// set default value;
interval_ms = 30 * 1000;
limit_kbps = 32000;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return;
}
SrsConfDirective* bw_test = conf->get("bandcheck");
if (!bw_test) {
return;
}
SrsConfDirective* interval_conf = bw_test->get("interval");
if (interval_conf) {
interval_ms = ::atoll(interval_conf->arg0().c_str()) * 1000;
}
SrsConfDirective* limit_kbps_conf = bw_test->get("limit_kbps");
if (limit_kbps_conf) {
limit_kbps = ::atoi(limit_kbps_conf->arg0().c_str());
}
return ::atoi(pithy->arg0().c_str());
}
bool SrsConfig::get_bw_check_enabled(const string &vhost)

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -116,7 +115,6 @@ public:
virtual void unsubscribe(ISrsReloadHandler* handler);
public:
virtual int parse_options(int argc, char** argv);
private:
virtual int parse_file(const char* filename);
virtual int parse_argv(int& i, char** argv);

0
trunk/src/core/srs_core_conn.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_conn.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_encoder.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_encoder.hpp Normal file → Executable file
View file

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

0
trunk/src/core/srs_core_forward.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_forward.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_handshake.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_handshake.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_hls.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_hls.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_http.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_http.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_log.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_log.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_pithy_print.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_pithy_print.hpp Normal file → Executable file
View file

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -1177,12 +1176,7 @@ bool SrsMessageHeader::is_set_chunk_size()
bool SrsMessageHeader::is_user_control_message()
{
return message_type == RTMP_MSG_UserControlMessage;
}
bool SrsMessageHeader::is_windows_ackledgement()
{
return message_type == RTMP_MSG_Acknowledgement;
return message_type == RTMP_MSG_UserControlMessage;
}
SrsChunkStream::SrsChunkStream(int _cid)
@ -1389,7 +1383,7 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_verbose("start to decode set chunk size message.");
packet = new SrsSetChunkSizePacket();
return packet->decode(stream);
} else {
} else {
// default packet to drop message.
srs_trace("drop the unknown message, type=%d", header.message_type);
packet = new SrsPacket();
@ -2614,31 +2608,7 @@ SrsOnStatusCallPacket::SrsOnStatusCallPacket()
SrsOnStatusCallPacket::~SrsOnStatusCallPacket()
{
srs_freep(args);
srs_freep(data);
}
int SrsOnStatusCallPacket::decode(SrsStream *stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode play command_name failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode play transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode play command_object failed. ret=%d", ret);
return ret;
}
srs_info("decode SrsOnStatusCallPacket success.");
return ret;
srs_freep(data);
}
int SrsOnStatusCallPacket::get_perfer_cid()
@ -3111,14 +3081,6 @@ SrsAcknowledgementPacket::~SrsAcknowledgementPacket()
{
}
int SrsAcknowledgementPacket::decode(SrsStream *stream)
{
int ret = ERROR_SUCCESS;
ret = sequence_number = stream->read_4bytes();
return ret;
}
int SrsAcknowledgementPacket::get_perfer_cid()
{
return RTMP_CID_ProtocolControl;
@ -3344,3 +3306,4 @@ int SrsUserControlPacket::encode_packet(SrsStream* stream)
return ret;
}

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -67,25 +66,6 @@ class ISrsMessage;
*/
#define RTMP_MAX_FMT3_HEADER_SIZE 5
/**
* band width check method name, which will be invoked by client.
* band width check mothods use SrsOnStatusCallPacket as its internal packet type,
* so ensure you set command name when you use it.
*/
// for play
#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes"
#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes"
#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes"
#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes"
#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying"
// for publish
#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes"
#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes"
#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes"
#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished"
#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing"
/**
* the protocol provides the rtmp-message-protocol services,
* to recv RTMP message from RTMP chunk stream,
@ -240,7 +220,6 @@ struct SrsMessageHeader
bool is_window_ackledgement_size();
bool is_set_chunk_size();
bool is_user_control_message();
bool is_windows_ackledgement();
};
/**
@ -845,10 +824,6 @@ public:
public:
SrsOnStatusCallPacket();
virtual ~SrsOnStatusCallPacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
@ -1044,10 +1019,6 @@ public:
public:
SrsAcknowledgementPacket();
virtual ~SrsAcknowledgementPacket();
public:
virtual int decode(SrsStream *stream);
public:
virtual int get_perfer_cid();
public:

0
trunk/src/core/srs_core_refer.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_refer.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_reload.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_reload.hpp Normal file → Executable file
View file

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiejit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -73,8 +72,8 @@ using namespace std;
#define SRS_DEFAULT_SID 1
SrsRequest::SrsRequest()
: objectEncoding(RTMP_SIG_AMF0_VER)
{
objectEncoding = RTMP_SIG_AMF0_VER;
}
SrsRequest::~SrsRequest()
@ -122,23 +121,10 @@ int SrsRequest::discovery_app()
port = vhost.substr(pos + 1);
vhost = vhost.substr(0, pos);
srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());
}
}
app = url;
srs_vhost_resolve(vhost, app);
// reslove bw check key
std::string app_str = url;
if ((pos = app_str.find("key=")) != std::string::npos){
std::string temp_key = app_str.substr(pos + strlen("key="));
for(unsigned int i = 0; i < temp_key.size(); ++i){
char c = temp_key[i];
if(c != '&')
bw_key.push_back(c);
else break;
}
}
strip();
// resolve the vhost from config
@ -280,7 +266,7 @@ int SrsRtmpClient::handshake()
return ret;
}
int SrsRtmpClient::connect_app(const std::string &app, const std::string &tc_url)
int SrsRtmpClient::connect_app(string app, string tc_url)
{
int ret = ERROR_SUCCESS;
@ -366,7 +352,7 @@ int SrsRtmpClient::create_stream(int& stream_id)
return ret;
}
int SrsRtmpClient::play(const std::string &stream, int stream_id)
int SrsRtmpClient::play(string stream, int stream_id)
{
int ret = ERROR_SUCCESS;
@ -408,7 +394,7 @@ int SrsRtmpClient::play(const std::string &stream, int stream_id)
return ret;
}
int SrsRtmpClient::publish(const std::string &stream, int stream_id)
int SrsRtmpClient::publish(string stream, int stream_id)
{
int ret = ERROR_SUCCESS;
@ -606,7 +592,7 @@ int SrsRtmp::response_connect_app(SrsRequest *req, const char* server_ip)
pkt->props->set("mode", new SrsAmf0Number(1));
pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));
pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));
pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));
pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();
@ -1105,60 +1091,7 @@ int SrsRtmp::start_flash_publish(int stream_id)
srs_info("flash publish success.");
return ret;
}
int SrsRtmp::start_bandwidth_check(int limit_kbps)
{
int ret = ERROR_SUCCESS;
int play_duration_ms = 3000;
int play_interval_ms = 0;
int play_actual_duration_ms = 0;
int play_bytes = 0;
int publish_duration_ms = 3000;
int publish_interval_ms = 0;
int publish_actual_duration_ms = 0;
int publish_bytes = 0;
int64_t start_time = srs_get_system_time_ms();
if ((ret = bandwidth_check_play(play_duration_ms, play_interval_ms,
play_actual_duration_ms, play_bytes, limit_kbps) != ERROR_SUCCESS)
|| (ret = bandwidth_check_publish(publish_duration_ms, publish_interval_ms,
publish_actual_duration_ms, publish_bytes, limit_kbps)) != ERROR_SUCCESS) {
srs_error("band width check failed. ret = %d", ret);
return ret;
}
int64_t end_time = srs_get_system_time_ms();
int play_kbps = play_bytes * 8 / play_actual_duration_ms;
int publish_kbps = publish_bytes * 8 / publish_actual_duration_ms;
// send finished msg
SrsCommonMessage* finish_msg = new SrsCommonMessage();
SrsOnStatusCallPacket* finish_pkt = new SrsOnStatusCallPacket;
finish_pkt->command_name = SRS_BW_CHECK_FINISHED;
finish_pkt->data->set("code", new SrsAmf0Number(0));
finish_pkt->data->set("start_time", new SrsAmf0Number(start_time));
finish_pkt->data->set("end_time", new SrsAmf0Number(end_time));
finish_pkt->data->set("play_kbps", new SrsAmf0Number(play_kbps));
finish_pkt->data->set("publish_kbps", new SrsAmf0Number(publish_kbps));
finish_pkt->data->set("play_bytes", new SrsAmf0Number(play_bytes));
finish_pkt->data->set("play_time", new SrsAmf0Number(play_actual_duration_ms));
finish_pkt->data->set("publish_bytes", new SrsAmf0Number(publish_bytes));
finish_pkt->data->set("publish_time", new SrsAmf0Number(publish_actual_duration_ms));
finish_msg->set_packet(finish_pkt, 0);
if ((ret = protocol->send_message(finish_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check finish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check finished.");
return ret;
return ret;
}
int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, string& stream_name)
@ -1249,216 +1182,6 @@ int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType&
type = SrsClientFlashPublish;
stream_name = req->stream_name;
return ret;
}
int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_duration_ms,
int &play_bytes, int max_play_kbps)
{
int ret = ERROR_SUCCESS;
// send start play command to client
SrsCommonMessage* start_play_msg = new SrsCommonMessage();
SrsOnStatusCallPacket* start_play_packet = new SrsOnStatusCallPacket;
start_play_packet->command_name = SRS_BW_CHECK_START_PLAY;
start_play_packet->data->set("duration_ms", new SrsAmf0Number(duration_ms));
start_play_packet->data->set("interval_ms", new SrsAmf0Number(interval_ms));
start_play_msg->set_packet(start_play_packet, 0);
if ((ret = protocol->send_message(start_play_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start play message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check begin.");
// recv client's starting play response
while (true) {
SrsCommonMessage* msg = 0;
if ( (ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's starting play response failed. ret= %d", ret);
return ret;
}
msg->decode_packet(protocol);
SrsOnStatusCallPacket* pkt = dynamic_cast<SrsOnStatusCallPacket*>(msg->get_packet());
if(pkt && (pkt->command_name == SRS_BW_CHECK_STARTING_PLAY))
break;
}
srs_trace("BW check recv play begin response.");
// send play data to client
int64_t current_time = srs_get_system_time_ms();
int size = 1024;
char random_data[size];
memset(random_data, 0x01, size);
int interval = 0;
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(interval);
SrsCommonMessage* msg = new SrsCommonMessage;
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket;
pkt->command_name = SRS_BW_CHECK_PLAYING;
int object_num = 1;
for (int i = 0; i < object_num; ++i) {
char buf[32];
sprintf(buf, "%d", i);
pkt->data->set(buf, new SrsAmf0String(random_data));
}
object_num += 1;
msg->set_packet(pkt, 0);
play_bytes += pkt->get_payload_length();
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
// sleep while current kbps <= max_play_kbps
int kbps = 0;
while (true) {
if(srs_get_system_time_ms() - current_time != 0)
kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_time);
if (kbps > max_play_kbps) {
st_usleep(500);
} else {
break;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_time;
srs_trace("BW check send play bytes over.");
// notify client to stop play
SrsCommonMessage* stop_play_msg = new SrsCommonMessage;
SrsOnStatusCallPacket* stop_play_pkt = new SrsOnStatusCallPacket;
stop_play_pkt->command_name = SRS_BW_CHECK_STOP_PLAY;
stop_play_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
stop_play_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
stop_play_pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms));
stop_play_pkt->data->set("bytes_delta", new SrsAmf0Number(play_bytes));
stop_play_msg->set_packet(stop_play_pkt, 0);
if ((ret = protocol->send_message(stop_play_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop play message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check stop play bytes.");
// recv client's stop play response.
while (true) {
SrsCommonMessage* msg = 0;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's stop play response failed. ret = %d", ret);
return ret;
}
msg->decode_packet(protocol);
SrsOnStatusCallPacket* pkt = dynamic_cast<SrsOnStatusCallPacket*>(msg->get_packet());
if(pkt && (pkt->command_name == SRS_BW_CHECK_STOPPED_PLAY))
break;
}
srs_trace("BW check recv stop play response.");
return ret;
}
int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actual_duration_ms,
int &publish_bytes, int max_pub_kbps)
{
int ret = ERROR_SUCCESS;
// notify client to start publish
SrsCommonMessage* start_publish_msg = new SrsCommonMessage;
SrsOnStatusCallPacket* start_publish_pkt = new SrsOnStatusCallPacket;
start_publish_pkt->command_name = SRS_BW_CHECK_START_PUBLISH;
start_publish_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
start_publish_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
start_publish_msg->set_packet(start_publish_pkt, 0);
if ((ret = protocol->send_message(start_publish_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start publish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check publish begin.");
// read client's notification of starting publish
while (true) {
SrsCommonMessage* msg = 0;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's notification of starting publish failed. ret = %d", ret);
return ret;
}
msg->decode_packet(protocol);
SrsOnStatusCallPacket* pkt = dynamic_cast<SrsOnStatusCallPacket*>(msg->get_packet());
if(pkt && (pkt->command_name == SRS_BW_CHECK_STARTING_PUBLISH))
break;
}
srs_trace("BW check recv publish begin response.");
// recv publish msgs until @duration_ms ms
int64_t current_time = srs_get_system_time_ms();
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(0);
SrsCommonMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
publish_bytes += msg->header.payload_length;
int kbps = 0;
while (true) {
if(srs_get_system_time_ms() - current_time != 0)
kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time);
if (kbps > max_pub_kbps) {
st_usleep(500);
} else {
break;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_time;
srs_trace("BW check recv publish data over.");
// notify client to stop publish
SrsCommonMessage* stop_publish_msg = new SrsCommonMessage;
SrsOnStatusCallPacket* stop_publish_pkt = new SrsOnStatusCallPacket;
stop_publish_pkt->command_name = SRS_BW_CHECK_STOP_PUBLISH;
stop_publish_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
stop_publish_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
stop_publish_pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms));
stop_publish_pkt->data->set("bytes_delta", new SrsAmf0Number(publish_bytes));
stop_publish_msg->set_packet(stop_publish_pkt, 0);
if ((ret = protocol->send_message(stop_publish_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop publish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check stop pulish.");
// recv left msg
while (true) {
if((ret = st_netfd_poll(stfd, POLLIN, 1000*500)) == ERROR_SUCCESS) {
SrsCommonMessage* msg = 0;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv client's left msg failed, ret = %d", ret);
return ret;
}
} else {
ret = ERROR_SUCCESS;
break;
}
}
return ret;
return ret;
}

View file

@ -2,7 +2,6 @@
The MIT License (MIT)
Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
@ -63,7 +62,6 @@ struct SrsRequest
std::string port;
std::string app;
std::string stream;
std::string bw_key;
SrsRequest();
virtual ~SrsRequest();
@ -104,7 +102,7 @@ enum SrsClientType
SrsClientUnknown,
SrsClientPlay,
SrsClientFMLEPublish,
SrsClientFlashPublish
SrsClientFlashPublish,
};
/**
@ -129,10 +127,10 @@ public:
virtual int send_message(ISrsMessage* msg);
public:
virtual int handshake();
virtual int connect_app(const std::string &app, const std::string &tc_url);
virtual int connect_app(std::string app, std::string tc_url);
virtual int create_stream(int& stream_id);
virtual int play(const std::string &stream, int stream_id);
virtual int publish(const std::string &stream, int stream_id);
virtual int play(std::string stream, int stream_id);
virtual int publish(std::string stream, int stream_id);
};
/**
@ -169,16 +167,11 @@ public:
* using the Limit type field.
*/
virtual int set_peer_bandwidth(int bandwidth, int type);
<<<<<<< HEAD
virtual int response_connect_app(SrsRequest* req, const char *ip = 0);
virtual int response_connect_reject(SrsRequest* req, const std::string& description);
=======
/**
* @param server_ip the ip of server.
*/
virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL);
virtual void response_connect_reject(SrsRequest* req, const char* desc);
>>>>>>> upstream/master
virtual int on_bw_done();
/**
* recv some message to identify the client.
@ -229,18 +222,10 @@ public:
* onStatus(NetStream.Publish.Start)
*/
virtual int start_flash_publish(int stream_id);
/**
* used to process band width check from client.
*/
virtual int start_bandwidth_check(int limit_kbps);
private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name);
virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name);
virtual int bandwidth_check_play(int duration_ms, int interval_ms, int& actual_duration_ms, int& play_bytes, int max_play_kbps);
virtual int bandwidth_check_publish(int duration_ms, int interval_ms, int& actual_duration_ms, int& publish_bytes, int max_pub_kbps);
};
#endif

View file

@ -230,7 +230,7 @@ int SrsServer::cycle()
// the deamon thread, update the time cache
while (true) {
st_usleep(SRS_TIME_RESOLUTION_MS * 1000);
srs_update_system_time_ms();
srs_update_system_time_ms();
if (signal_reload) {
signal_reload = false;

0
trunk/src/core/srs_core_server.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_socket.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_socket.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_stream.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_stream.hpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_thread.cpp Normal file → Executable file
View file

0
trunk/src/core/srs_core_thread.hpp Normal file → Executable file
View file