1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

+server support band check +every vhost can have it's own chunk size

This commit is contained in:
wenjie 2013-12-12 20:56:19 +08:00
parent 8887754b17
commit 6e9f375606
16 changed files with 668 additions and 108 deletions

2
trunk/auto/build_ffmpeg.sh Normal file → Executable file
View file

@ -80,7 +80,7 @@ else
--yasmexe=${ff_yasm_bin} \ --yasmexe=${ff_yasm_bin} \
--prefix=${ff_release_dir} --cc= \ --prefix=${ff_release_dir} --cc= \
--enable-static --disable-shared --disable-debug \ --enable-static --disable-shared --disable-debug \
--extra-cflags='-I${ffmpeg_exported_release_dir}/include' \ --extra-cflags='-I${ffmpeg_exported_release_dir}/include -I /usr/include/freetype2/' \
--extra-ldflags='-L${ffmpeg_exported_release_dir}/lib -lm -ldl' \ --extra-ldflags='-L${ffmpeg_exported_release_dir}/lib -lm -ldl' \
--disable-ffplay --disable-ffprobe --disable-ffserver --disable-doc \ --disable-ffplay --disable-ffprobe --disable-ffserver --disable-doc \
--enable-postproc --enable-bzlib --enable-zlib --enable-parsers \ --enable-postproc --enable-bzlib --enable-zlib --enable-parsers \

View file

@ -1,6 +1,6 @@
listen 19350; listen 19350;
chunk_size 65000;
vhost __defaultVhost__ { vhost __defaultVhost__ {
chunk_size 65000;
enabled on; enabled on;
gop_cache on; gop_cache on;
hls on; hls on;

View file

@ -1,11 +1,5 @@
# the listen ports, split by space. # the listen ports, split by space.
listen 1935; listen 1935;
# the default chunk size is 128, max is 65536,
# some client does not support chunk size change,
# however, most clients supports it and it can improve
# performance about 10%.
# if not specified, set to 4096.
chunk_size 65000;
# the logs dir. # the logs dir.
# if enabled ffmpeg, each stracoding stream will create a log file. # if enabled ffmpeg, each stracoding stream will create a log file.
# default: ./objs/logs # default: ./objs/logs
@ -19,6 +13,12 @@ max_connections 2000;
# for which cannot identify the required vhost. # for which cannot identify the required vhost.
# for default demo. # for default demo.
vhost __defaultVhost__ { vhost __defaultVhost__ {
# the default chunk size is 128, max is 65536,
# some client does not support chunk size change,
# however, most clients supports it and it can improve
# performance about 10%.
# if not specified, set to 4096.
chunk_size 65000;
enabled on; enabled on;
gop_cache on; gop_cache on;
hls on; hls on;
@ -79,6 +79,7 @@ vhost __defaultVhost__ {
} }
# for development # for development
vhost dev { vhost dev {
chunk_size 65000;
enabled on; enabled on;
gop_cache on; gop_cache on;
hls on; hls on;
@ -87,7 +88,7 @@ vhost dev {
hls_window 30; hls_window 30;
forward 127.0.0.1:19350; forward 127.0.0.1:19350;
http_hooks { http_hooks {
enabled on; enabled off;
on_connect http://127.0.0.1:8085/api/v1/clients; on_connect http://127.0.0.1:8085/api/v1/clients;
on_close http://127.0.0.1:8085/api/v1/clients; on_close http://127.0.0.1:8085/api/v1/clients;
on_publish http://127.0.0.1:8085/api/v1/streams; on_publish http://127.0.0.1:8085/api/v1/streams;
@ -133,6 +134,20 @@ vhost dev {
} }
} }
} }
vhost bandcheck.srs.com {
chunk_size 65000;
enabled on;
# vhost for band width check
bandcheck{
enabled on;
key test kate;
interval 30;
max_play_kbps 45000;
max_pub_kbps 25000;
}
}
# the http hook callback vhost, srs will invoke the hooks for specified events. # the http hook callback vhost, srs will invoke the hooks for specified events.
vhost hooks.callback.vhost.com { vhost hooks.callback.vhost.com {
http_hooks { http_hooks {

10
trunk/src/core/srs_core.cpp Normal file → Executable file
View file

@ -33,7 +33,7 @@ static int64_t _srs_system_time_us_cache = 0;
int64_t srs_get_system_time_ms() int64_t srs_get_system_time_ms()
{ {
return _srs_system_time_us_cache / 1000; return _srs_system_time_us_cache / 1000;
} }
void srs_update_system_time_ms() void srs_update_system_time_ms()
@ -43,7 +43,7 @@ void srs_update_system_time_ms()
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
// we must convert the tv_sec/tv_usec to int64_t. // we must convert the tv_sec/tv_usec to int64_t.
_srs_system_time_us_cache = now.tv_sec * 1000 * 1000 + now.tv_usec; _srs_system_time_us_cache = ((int64_t)now.tv_sec) * 1000 * 1000 + (int64_t)now.tv_usec;
_srs_system_time_us_cache = srs_max(0, _srs_system_time_us_cache); _srs_system_time_us_cache = srs_max(0, _srs_system_time_us_cache);
} }
@ -103,11 +103,11 @@ void srs_vhost_resolve(std::string& vhost, std::string& app)
if ((pos = query.find("vhost?")) != std::string::npos if ((pos = query.find("vhost?")) != std::string::npos
|| (pos = query.find("vhost=")) != std::string::npos || (pos = query.find("vhost=")) != std::string::npos
|| (pos = query.find("Vhost?")) != std::string::npos || (pos = query.find("Vhost?")) != std::string::npos
|| (pos = query.find("Vhost=")) != std::string::npos || (pos = query.find("Vhost=")) != std::string::npos)
) { {
query = query.substr(pos + 6); query = query.substr(pos + 6);
if (!query.empty()) { if (!query.empty()) {
vhost = query; vhost = query;
} }
} }
} }

2
trunk/src/core/srs_core.hpp Normal file → Executable file
View file

@ -102,4 +102,4 @@ extern std::string srs_dns_resolve(std::string host);
// app...vhost...request_vhost // app...vhost...request_vhost
extern void srs_vhost_resolve(std::string& vhost, std::string& app); extern void srs_vhost_resolve(std::string& vhost, std::string& app);
#endif #endif

84
trunk/src/core/srs_core_client.cpp Normal file → Executable file
View file

@ -117,8 +117,8 @@ int SrsClient::do_cycle()
int SrsClient::service_cycle() int SrsClient::service_cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) { if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) {
srs_error("set window acknowledgement size failed. ret=%d", ret); srs_error("set window acknowledgement size failed. ret=%d", ret);
return ret; return ret;
} }
@ -129,29 +129,59 @@ int SrsClient::service_cycle()
return ret; return ret;
} }
srs_verbose("set peer bandwidth success"); srs_verbose("set peer bandwidth success");
if(config->get_bw_check_enabled(req->vhost, req->bw_key))
{
static int64_t last_check_time_ms = srs_get_system_time_ms();
int64_t interval_ms = 0;
int play_kbps = 0;
int pub_kbps = 0;
config->get_bw_check_settings(req->vhost, interval_ms, play_kbps, pub_kbps);
if((srs_get_system_time_ms() - last_check_time_ms) < interval_ms
&& last_check_time_ms != srs_get_system_time_ms())
{
srs_trace("bandcheck interval less than limted interval. last time=%lld, current time=%lld"
, last_check_time_ms, srs_get_system_time_ms());
return rtmp->response_connect_reject(req, "your bandcheck frequency is too high!");
} else {
last_check_time_ms = srs_get_system_time_ms(); // update last check time
char* local_ip = 0;
if((ret = get_local_ip(local_ip)) != ERROR_SUCCESS){
srs_error("get local ip failed. ret = %d", ret);
return ret;
}
if ((ret = rtmp->response_connect_app(req, local_ip)) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret);
return ret;
}
return rtmp->start_bandwidth_check(play_kbps, pub_kbps);
}
}
if ((ret = rtmp->response_connect_app(req)) != ERROR_SUCCESS) { if ((ret = rtmp->response_connect_app(req)) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret); srs_error("response connect app failed. ret=%d", ret);
return ret; return ret;
} }
srs_verbose("response connect app success"); srs_verbose("response connect app success");
if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) { if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) {
srs_error("on_bw_done failed. ret=%d", ret); srs_error("on_bw_done failed. ret=%d", ret);
return ret; return ret;
} }
srs_verbose("on_bw_done success"); srs_verbose("on_bw_done success");
SrsClientType type; SrsClientType type;
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) { if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) {
srs_error("identify client failed. ret=%d", ret); srs_error("identify client failed. ret=%d", ret);
return ret; return ret;
} }
req->strip(); req->strip();
srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
int chunk_size = 4096; int chunk_size = 4096;
SrsConfDirective* conf = config->get_chunk_size(); SrsConfDirective* conf = config->get_chunk_size(req->vhost);
if (conf && !conf->arg0().empty()) { if (conf && !conf->arg0().empty()) {
chunk_size = ::atoi(conf->arg0().c_str()); chunk_size = ::atoi(conf->arg0().c_str());
} }
@ -517,6 +547,40 @@ int SrsClient::get_peer_ip()
return ret; return ret;
} }
int SrsClient::get_local_ip(char *&local_ip)
{
int ret = ERROR_SUCCESS;
int fd = st_netfd_fileno(stfd);
// discovery client information
sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
if (getsockname(fd, (sockaddr*)&addr, &addrlen) == -1) {
ret = ERROR_SOCKET_GET_LOCAL_IP;
srs_error("discovery local ip information failed. ret=%d", ret);
return ret;
}
srs_verbose("get local ip success.");
// ip v4 or v6
char buf[INET6_ADDRSTRLEN];
memset(buf, 0, sizeof(buf));
if ((inet_ntop(addr.sin_family, &addr.sin_addr, buf, sizeof(buf))) == NULL) {
ret = ERROR_SOCKET_GET_LOCAL_IP;
srs_error("convert local ip information failed. ret=%d", ret);
return ret;
}
local_ip = new char[strlen(buf) + 1];
strcpy(local_ip, buf);
srs_verbose("get local ip of client ip=%s, fd=%d", buf, fd);
return ret;
}
int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg) int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;

3
trunk/src/core/srs_core_client.hpp Normal file → Executable file
View file

@ -70,6 +70,7 @@ private:
virtual int publish(SrsSource* source, bool is_fmle); virtual int publish(SrsSource* source, bool is_fmle);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle);
virtual int get_peer_ip(); virtual int get_peer_ip();
virtual int get_local_ip(char *&local_ip);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
private: private:
virtual int on_connect(); virtual int on_connect();
@ -80,4 +81,4 @@ private:
virtual void on_stop(); virtual void on_stop();
}; };
#endif #endif

106
trunk/src/core/srs_core_config.cpp Normal file → Executable file
View file

@ -539,7 +539,7 @@ int SrsConfig::parse_options(int argc, char** argv)
return parse_file(config_file.c_str()); return parse_file(config_file.c_str());
} }
SrsConfDirective* SrsConfig::get_vhost(std::string vhost) SrsConfDirective* SrsConfig::get_vhost(const std::string& vhost)
{ {
srs_assert(root); srs_assert(root);
@ -562,7 +562,7 @@ SrsConfDirective* SrsConfig::get_vhost(std::string vhost)
return NULL; return NULL;
} }
SrsConfDirective* SrsConfig::get_vhost_on_connect(std::string vhost) SrsConfDirective* SrsConfig::get_vhost_on_connect(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -583,7 +583,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_connect(std::string vhost)
return conf->get("on_connect"); return conf->get("on_connect");
} }
SrsConfDirective* SrsConfig::get_vhost_on_close(std::string vhost) SrsConfDirective* SrsConfig::get_vhost_on_close(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -604,7 +604,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_close(std::string vhost)
return conf->get("on_close"); return conf->get("on_close");
} }
SrsConfDirective* SrsConfig::get_vhost_on_publish(std::string vhost) SrsConfDirective* SrsConfig::get_vhost_on_publish(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -625,7 +625,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_publish(std::string vhost)
return conf->get("on_publish"); return conf->get("on_publish");
} }
SrsConfDirective* SrsConfig::get_vhost_on_unpublish(std::string vhost) SrsConfDirective* SrsConfig::get_vhost_on_unpublish(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -646,7 +646,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_unpublish(std::string vhost)
return conf->get("on_unpublish"); return conf->get("on_unpublish");
} }
SrsConfDirective* SrsConfig::get_vhost_on_play(std::string vhost) SrsConfDirective* SrsConfig::get_vhost_on_play(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -667,7 +667,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_play(std::string vhost)
return conf->get("on_play"); return conf->get("on_play");
} }
SrsConfDirective* SrsConfig::get_vhost_on_stop(std::string vhost) SrsConfDirective* SrsConfig::get_vhost_on_stop(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -688,7 +688,7 @@ SrsConfDirective* SrsConfig::get_vhost_on_stop(std::string vhost)
return conf->get("on_stop"); return conf->get("on_stop");
} }
bool SrsConfig::get_vhost_enabled(std::string vhost) bool SrsConfig::get_vhost_enabled(const std::string &vhost)
{ {
SrsConfDirective* vhost_conf = get_vhost(vhost); SrsConfDirective* vhost_conf = get_vhost(vhost);
@ -708,7 +708,7 @@ bool SrsConfig::get_vhost_enabled(std::string vhost)
return true; return true;
} }
SrsConfDirective* SrsConfig::get_transcode(std::string vhost, std::string scope) SrsConfDirective* SrsConfig::get_transcode(const std::string &vhost, const std::string &scope)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1059,7 +1059,7 @@ int SrsConfig::get_max_connections()
return ::atoi(conf->arg0().c_str()); return ::atoi(conf->arg0().c_str());
} }
SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost) SrsConfDirective* SrsConfig::get_gop_cache(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1070,7 +1070,7 @@ SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost)
return conf->get("gop_cache"); return conf->get("gop_cache");
} }
SrsConfDirective* SrsConfig::get_forward(std::string vhost) SrsConfDirective* SrsConfig::get_forward(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1081,7 +1081,7 @@ SrsConfDirective* SrsConfig::get_forward(std::string vhost)
return conf->get("forward"); return conf->get("forward");
} }
SrsConfDirective* SrsConfig::get_hls(std::string vhost) SrsConfDirective* SrsConfig::get_hls(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1092,7 +1092,7 @@ SrsConfDirective* SrsConfig::get_hls(std::string vhost)
return conf->get("hls"); return conf->get("hls");
} }
bool SrsConfig::get_hls_enabled(std::string vhost) bool SrsConfig::get_hls_enabled(const std::string &vhost)
{ {
SrsConfDirective* hls = get_hls(vhost); SrsConfDirective* hls = get_hls(vhost);
@ -1107,7 +1107,7 @@ bool SrsConfig::get_hls_enabled(std::string vhost)
return true; return true;
} }
SrsConfDirective* SrsConfig::get_hls_path(std::string vhost) SrsConfDirective* SrsConfig::get_hls_path(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1118,7 +1118,7 @@ SrsConfDirective* SrsConfig::get_hls_path(std::string vhost)
return conf->get("hls_path"); return conf->get("hls_path");
} }
SrsConfDirective* SrsConfig::get_hls_fragment(std::string vhost) SrsConfDirective* SrsConfig::get_hls_fragment(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1129,7 +1129,7 @@ SrsConfDirective* SrsConfig::get_hls_fragment(std::string vhost)
return conf->get("hls_fragment"); return conf->get("hls_fragment");
} }
SrsConfDirective* SrsConfig::get_hls_window(std::string vhost) SrsConfDirective* SrsConfig::get_hls_window(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1140,7 +1140,7 @@ SrsConfDirective* SrsConfig::get_hls_window(std::string vhost)
return conf->get("hls_window"); return conf->get("hls_window");
} }
SrsConfDirective* SrsConfig::get_refer(std::string vhost) SrsConfDirective* SrsConfig::get_refer(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1151,7 +1151,7 @@ SrsConfDirective* SrsConfig::get_refer(std::string vhost)
return conf->get("refer"); return conf->get("refer");
} }
SrsConfDirective* SrsConfig::get_refer_play(std::string vhost) SrsConfDirective* SrsConfig::get_refer_play(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1162,7 +1162,7 @@ SrsConfDirective* SrsConfig::get_refer_play(std::string vhost)
return conf->get("refer_play"); return conf->get("refer_play");
} }
SrsConfDirective* SrsConfig::get_refer_publish(std::string vhost) SrsConfDirective* SrsConfig::get_refer_publish(const std::string &vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
@ -1178,9 +1178,15 @@ SrsConfDirective* SrsConfig::get_listen()
return root->get("listen"); return root->get("listen");
} }
SrsConfDirective* SrsConfig::get_chunk_size() SrsConfDirective* SrsConfig::get_chunk_size(const std::string &vhost)
{ {
return root->get("chunk_size"); SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
return conf->get("chunk_size");
} }
SrsConfDirective* SrsConfig::get_pithy_print_publish() SrsConfDirective* SrsConfig::get_pithy_print_publish()
@ -1230,7 +1236,63 @@ SrsConfDirective* SrsConfig::get_pithy_print_play()
return NULL; return NULL;
} }
return pithy->get("play"); return pithy->get("play");
}
bool SrsConfig::get_bw_check_enabled(const std::string &vhost, const std::string &key)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return false;
}
SrsConfDirective* bw_test = conf->get("bandcheck");
if(!bw_test)
return false;
SrsConfDirective* bw_enabled_conf = bw_test->get("enabled");
if(bw_enabled_conf && bw_enabled_conf->arg0() == "on"){
SrsConfDirective* bw_key = bw_test->get("key");
if(!bw_key) return false;
std::vector<std::string> &args = bw_key->args;
for(unsigned int i = 0; i < args.size(); ++i){
if(args.at(i) == key)
return true;
}
}
return false;
}
void SrsConfig::get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &play_kbps, int &pub_kbps)
{
// set default value;
interval_ms = 30 * 1000;
play_kbps = 45000;
pub_kbps = 25000;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return;
}
SrsConfDirective* bw_test = conf->get("bandcheck");
if(!bw_test)
return;
SrsConfDirective* interval_conf = bw_test->get("interval");
if(interval_conf)
interval_ms = ::atoll(interval_conf->arg0().c_str()) * 1000;
SrsConfDirective* play_conf = bw_test->get("max_play_kbps");
if(play_conf)
play_kbps = ::atoi(play_conf->arg0().c_str());
SrsConfDirective* pub_conf = bw_test->get("max_pub_kbps");
if(pub_conf)
pub_kbps = ::atoi(pub_conf->arg0().c_str());
} }
int SrsConfig::parse_file(const char* filename) int SrsConfig::parse_file(const char* filename)

44
trunk/src/core/srs_core_config.hpp Normal file → Executable file
View file

@ -117,15 +117,15 @@ public:
public: public:
virtual int parse_options(int argc, char** argv); virtual int parse_options(int argc, char** argv);
public: public:
virtual SrsConfDirective* get_vhost(std::string vhost); virtual SrsConfDirective* get_vhost(const std::string &vhost);
virtual bool get_vhost_enabled(std::string vhost); virtual bool get_vhost_enabled(const std::string& vhost);
virtual SrsConfDirective* get_vhost_on_connect(std::string vhost); virtual SrsConfDirective* get_vhost_on_connect(const std::string& vhost);
virtual SrsConfDirective* get_vhost_on_close(std::string vhost); virtual SrsConfDirective* get_vhost_on_close(const std::string& vhost);
virtual SrsConfDirective* get_vhost_on_publish(std::string vhost); virtual SrsConfDirective* get_vhost_on_publish(const std::string& vhost);
virtual SrsConfDirective* get_vhost_on_unpublish(std::string vhost); virtual SrsConfDirective* get_vhost_on_unpublish(const std::string& vhost);
virtual SrsConfDirective* get_vhost_on_play(std::string vhost); virtual SrsConfDirective* get_vhost_on_play(const std::string& vhost);
virtual SrsConfDirective* get_vhost_on_stop(std::string vhost); virtual SrsConfDirective* get_vhost_on_stop(const std::string& vhost);
virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); virtual SrsConfDirective* get_transcode(const std::string& vhost, const std::string& scope);
virtual bool get_transcode_enabled(SrsConfDirective* transcode); virtual bool get_transcode_enabled(SrsConfDirective* transcode);
virtual std::string get_transcode_ffmpeg(SrsConfDirective* transcode); virtual std::string get_transcode_ffmpeg(SrsConfDirective* transcode);
virtual void get_transcode_engines(SrsConfDirective* transcode, std::vector<SrsConfDirective*>& engines); virtual void get_transcode_engines(SrsConfDirective* transcode, std::vector<SrsConfDirective*>& engines);
@ -148,23 +148,25 @@ public:
virtual std::string get_engine_output(SrsConfDirective* engine); virtual std::string get_engine_output(SrsConfDirective* engine);
virtual std::string get_log_dir(); virtual std::string get_log_dir();
virtual int get_max_connections(); virtual int get_max_connections();
virtual SrsConfDirective* get_gop_cache(std::string vhost); virtual SrsConfDirective* get_gop_cache(const std::string& vhost);
virtual SrsConfDirective* get_forward(std::string vhost); virtual SrsConfDirective* get_forward(const std::string& vhost);
virtual SrsConfDirective* get_hls(std::string vhost); virtual SrsConfDirective* get_hls(const std::string &vhost);
virtual bool get_hls_enabled(std::string vhost); virtual bool get_hls_enabled(const std::string& vhost);
virtual SrsConfDirective* get_hls_path(std::string vhost); virtual SrsConfDirective* get_hls_path(const std::string& vhost);
virtual SrsConfDirective* get_hls_fragment(std::string vhost); virtual SrsConfDirective* get_hls_fragment(const std::string& vhost);
virtual SrsConfDirective* get_hls_window(std::string vhost); virtual SrsConfDirective* get_hls_window(const std::string& vhost);
virtual SrsConfDirective* get_refer(std::string vhost); virtual SrsConfDirective* get_refer(const std::string& vhost);
virtual SrsConfDirective* get_refer_play(std::string vhost); virtual SrsConfDirective* get_refer_play(const std::string& vhost);
virtual SrsConfDirective* get_refer_publish(std::string vhost); virtual SrsConfDirective* get_refer_publish(const std::string& vhost);
virtual SrsConfDirective* get_listen(); virtual SrsConfDirective* get_listen();
virtual SrsConfDirective* get_chunk_size(); virtual SrsConfDirective* get_chunk_size(const std::string &vhost);
virtual SrsConfDirective* get_pithy_print_publish(); virtual SrsConfDirective* get_pithy_print_publish();
virtual SrsConfDirective* get_pithy_print_forwarder(); virtual SrsConfDirective* get_pithy_print_forwarder();
virtual SrsConfDirective* get_pithy_print_encoder(); virtual SrsConfDirective* get_pithy_print_encoder();
virtual SrsConfDirective* get_pithy_print_hls(); virtual SrsConfDirective* get_pithy_print_hls();
virtual SrsConfDirective* get_pithy_print_play(); virtual SrsConfDirective* get_pithy_print_play();
virtual bool get_bw_check_enabled(const std::string &vhost, const std::string &key);
virtual void get_bw_check_settings(const std::string &vhost, int64_t &interval_ms, int &play_kbps, int &pub_kbps);
private: private:
virtual int parse_file(const char* filename); virtual int parse_file(const char* filename);
virtual int parse_argv(int& i, char** argv); virtual int parse_argv(int& i, char** argv);
@ -179,4 +181,4 @@ bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b);
// global config // global config
extern SrsConfig* config; extern SrsConfig* config;
#endif #endif

3
trunk/src/core/srs_core_error.hpp Normal file → Executable file
View file

@ -52,6 +52,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SOCKET_WRITE 209 #define ERROR_SOCKET_WRITE 209
#define ERROR_SOCKET_WAIT 210 #define ERROR_SOCKET_WAIT 210
#define ERROR_SOCKET_TIMEOUT 211 #define ERROR_SOCKET_TIMEOUT 211
#define ERROR_SOCKET_GET_LOCAL_IP 222
#define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_PLAIN_REQUIRED 300
#define ERROR_RTMP_CHUNK_START 301 #define ERROR_RTMP_CHUNK_START 301
@ -146,4 +147,4 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_HTTP_DATA_INVLIAD 801 #define ERROR_HTTP_DATA_INVLIAD 801
#define ERROR_HTTP_PARSE_HEADER 802 #define ERROR_HTTP_PARSE_HEADER 802
#endif #endif

95
trunk/src/core/srs_core_protocol.cpp Normal file → Executable file
View file

@ -1138,7 +1138,12 @@ bool SrsMessageHeader::is_set_chunk_size()
bool SrsMessageHeader::is_user_control_message() bool SrsMessageHeader::is_user_control_message()
{ {
return message_type == RTMP_MSG_UserControlMessage; return message_type == RTMP_MSG_UserControlMessage;
}
bool SrsMessageHeader::is_windows_ackledgement()
{
return message_type == RTMP_MSG_Acknowledgement;
} }
SrsChunkStream::SrsChunkStream(int _cid) SrsChunkStream::SrsChunkStream(int _cid)
@ -1311,7 +1316,21 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_info("decode the AMF0/AMF3 data(onMetaData message)."); srs_info("decode the AMF0/AMF3 data(onMetaData message).");
packet = new SrsOnMetaDataPacket(); packet = new SrsOnMetaDataPacket();
return packet->decode(stream); return packet->decode(stream);
} } else if( command == SRS_BW_CHECK_FINISHED
|| command == SRS_BW_CHECK_PLAYING
|| command == SRS_BW_CHECK_PUBLISHING
|| command == SRS_BW_CHECK_STARTING_PLAY
|| command == SRS_BW_CHECK_STARTING_PUBLISH
|| command == SRS_BW_CHECK_START_PLAY
|| command == SRS_BW_CHECK_START_PUBLISH
|| command == SRS_BW_CHECK_STOPPED_PLAY
|| command == SRS_BW_CHECK_STOP_PLAY
|| command == SRS_BW_CHECK_STOP_PUBLISH)
{
srs_info("decode the AMF0/AMF3 band width check message.");
packet = new SrsOnStatusCallPacket();
return packet->decode(stream);
}
// default packet to drop message. // default packet to drop message.
srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
@ -1329,7 +1348,11 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_verbose("start to decode set chunk size message."); srs_verbose("start to decode set chunk size message.");
packet = new SrsSetChunkSizePacket(); packet = new SrsSetChunkSizePacket();
return packet->decode(stream); return packet->decode(stream);
} else { } else if(header.is_windows_ackledgement()) {
srs_verbose("start to decode AcknowledgementPacket message.");
packet = new SrsAcknowledgementPacket();
return packet->decode(stream);
} else {
// default packet to drop message. // default packet to drop message.
srs_trace("drop the unknown message, type=%d", header.message_type); srs_trace("drop the unknown message, type=%d", header.message_type);
packet = new SrsPacket(); packet = new SrsPacket();
@ -1775,8 +1798,13 @@ int SrsConnectAppResPacket::get_message_type()
int SrsConnectAppResPacket::get_size() int SrsConnectAppResPacket::get_size()
{ {
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() int size = srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size();
+ srs_amf0_get_object_size(props)+ srs_amf0_get_object_size(info); if(props->size() > 0)
size += srs_amf0_get_object_size(props);
if(info->size() > 0)
size += srs_amf0_get_object_size(info);
return size;
} }
int SrsConnectAppResPacket::encode_packet(SrsStream* stream) int SrsConnectAppResPacket::encode_packet(SrsStream* stream)
@ -1795,16 +1823,22 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream)
} }
srs_verbose("encode transaction_id success."); srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) { if(props->size() > 0){
srs_error("encode props failed. ret=%d", ret); if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) {
return ret; srs_error("encode props failed. ret=%d", ret);
} return ret;
}
}
srs_verbose("encode props success."); srs_verbose("encode props success.");
if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) { if(info->size() > 0){
srs_error("encode info failed. ret=%d", ret); if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) {
return ret; srs_error("encode info failed. ret=%d", ret);
} return ret;
}
}
srs_verbose("encode info success."); srs_verbose("encode info success.");
srs_info("encode connect app response packet success."); srs_info("encode connect app response packet success.");
@ -2537,7 +2571,31 @@ SrsOnStatusCallPacket::SrsOnStatusCallPacket()
SrsOnStatusCallPacket::~SrsOnStatusCallPacket() SrsOnStatusCallPacket::~SrsOnStatusCallPacket()
{ {
srs_freep(args); srs_freep(args);
srs_freep(data); srs_freep(data);
}
int SrsOnStatusCallPacket::decode(SrsStream *stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode play command_name failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode play transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode play command_object failed. ret=%d", ret);
return ret;
}
srs_info("decode SrsOnStatusCallPacket success.");
return ret;
} }
int SrsOnStatusCallPacket::get_perfer_cid() int SrsOnStatusCallPacket::get_perfer_cid()
@ -2853,6 +2911,14 @@ SrsAcknowledgementPacket::~SrsAcknowledgementPacket()
{ {
} }
int SrsAcknowledgementPacket::decode(SrsStream *stream)
{
int ret = ERROR_SUCCESS;
ret = sequence_number = stream->read_4bytes();
return ret;
}
int SrsAcknowledgementPacket::get_perfer_cid() int SrsAcknowledgementPacket::get_perfer_cid()
{ {
return RTMP_CID_ProtocolControl; return RTMP_CID_ProtocolControl;
@ -3078,4 +3144,3 @@ int SrsUserControlPacket::encode_packet(SrsStream* stream)
return ret; return ret;
} }

31
trunk/src/core/srs_core_protocol.hpp Normal file → Executable file
View file

@ -220,6 +220,7 @@ struct SrsMessageHeader
bool is_window_ackledgement_size(); bool is_window_ackledgement_size();
bool is_set_chunk_size(); bool is_set_chunk_size();
bool is_user_control_message(); bool is_user_control_message();
bool is_windows_ackledgement();
}; };
/** /**
@ -801,6 +802,26 @@ protected:
virtual int encode_packet(SrsStream* stream); virtual int encode_packet(SrsStream* stream);
}; };
/**
* band width check method name, which will be invoked by client.
* band width check mothods use SrsOnStatusCallPacket as its internal packet type,
* so ensure you set command name when you use it.
*/
// for play
#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes"
#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes"
#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes"
#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes"
#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying"
// for publish
#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes"
#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes"
#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes"
#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished"
#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing"
/** /**
* onStatus command, AMF0 Call * onStatus command, AMF0 Call
* @remark, user must set the stream_id by SrsMessage.set_packet(). * @remark, user must set the stream_id by SrsMessage.set_packet().
@ -822,6 +843,10 @@ public:
public: public:
SrsOnStatusCallPacket(); SrsOnStatusCallPacket();
virtual ~SrsOnStatusCallPacket(); virtual ~SrsOnStatusCallPacket();
public:
virtual int decode(SrsStream* stream);
public: public:
virtual int get_perfer_cid(); virtual int get_perfer_cid();
public: public:
@ -968,6 +993,10 @@ public:
public: public:
SrsAcknowledgementPacket(); SrsAcknowledgementPacket();
virtual ~SrsAcknowledgementPacket(); virtual ~SrsAcknowledgementPacket();
public:
virtual int decode(SrsStream *stream);
public: public:
virtual int get_perfer_cid(); virtual int get_perfer_cid();
public: public:
@ -1141,4 +1170,4 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T**
return ret; return ret;
} }
#endif #endif

334
trunk/src/core/srs_core_rtmp.cpp Normal file → Executable file
View file

@ -2,6 +2,7 @@
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2013 winlin Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiejit
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in
@ -49,8 +50,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define StatusClientId "clientid" #define StatusClientId "clientid"
// status value // status value
#define StatusLevelStatus "status" #define StatusLevelStatus "status"
// status error
#define StatusLevelError "error"
// code value // code value
#define StatusCodeConnectSuccess "NetConnection.Connect.Success" #define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeConnectRejected "NetConnection.Connect.Rejected"
#define StatusCodeStreamReset "NetStream.Play.Reset" #define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start" #define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodeStreamPause "NetStream.Pause.Notify" #define StatusCodeStreamPause "NetStream.Pause.Notify"
@ -67,8 +71,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_DEFAULT_SID 1 #define SRS_DEFAULT_SID 1
SrsRequest::SrsRequest() SrsRequest::SrsRequest()
: objectEncoding(RTMP_SIG_AMF0_VER)
{ {
objectEncoding = RTMP_SIG_AMF0_VER;
} }
SrsRequest::~SrsRequest() SrsRequest::~SrsRequest()
@ -99,11 +103,23 @@ int SrsRequest::discovery_app()
port = vhost.substr(pos + 1); port = vhost.substr(pos + 1);
vhost = vhost.substr(0, pos); vhost = vhost.substr(0, pos);
srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str()); srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());
} }
app = url; app = url;
srs_vhost_resolve(vhost, app); srs_vhost_resolve(vhost, app);
// reslove bw check key
std::string app_str = url;
if ((pos = app_str.find("key=")) != std::string::npos){
std::string temp_key = app_str.substr(pos + strlen("key="));
for(unsigned int i = 0; i < temp_key.size(); ++i){
char c = temp_key[i];
if(c != '&')
bw_key.push_back(c);
else break;
}
}
// resolve the vhost from config // resolve the vhost from config
SrsConfDirective* parsed_vhost = config->get_vhost(vhost); SrsConfDirective* parsed_vhost = config->get_vhost(vhost);
if (parsed_vhost) { if (parsed_vhost) {
@ -238,7 +254,7 @@ int SrsRtmpClient::handshake()
return ret; return ret;
} }
int SrsRtmpClient::connect_app(std::string app, std::string tc_url) int SrsRtmpClient::connect_app(const std::string &app, const std::string &tc_url)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -324,7 +340,7 @@ int SrsRtmpClient::create_stream(int& stream_id)
return ret; return ret;
} }
int SrsRtmpClient::play(std::string stream, int stream_id) int SrsRtmpClient::play(const std::string &stream, int stream_id)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -366,7 +382,7 @@ int SrsRtmpClient::play(std::string stream, int stream_id)
return ret; return ret;
} }
int SrsRtmpClient::publish(std::string stream, int stream_id) int SrsRtmpClient::publish(const std::string &stream, int stream_id)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -544,7 +560,7 @@ int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)
return ret; return ret;
} }
int SrsRtmp::response_connect_app(SrsRequest* req) int SrsRtmp::response_connect_app(SrsRequest *req, const char *ip)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -556,7 +572,7 @@ int SrsRtmp::response_connect_app(SrsRequest* req)
pkt->props->set("mode", new SrsAmf0Number(1)); pkt->props->set("mode", new SrsAmf0Number(1));
pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess)); pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));
pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded")); pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));
pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding)); pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray(); SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();
@ -571,6 +587,9 @@ int SrsRtmp::response_connect_app(SrsRequest* req)
data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB)); data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB));
data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL)); data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL));
data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT)); data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT));
if(ip)
data->set("srs_server_ip", new SrsAmf0String(ip));
msg->set_packet(pkt, 0); msg->set_packet(pkt, 0);
@ -580,7 +599,30 @@ int SrsRtmp::response_connect_app(SrsRequest* req)
} }
srs_info("send connect app response message success."); srs_info("send connect app response message success.");
return ret; return ret;
}
int SrsRtmp::response_connect_reject(SrsRequest *req, const std::string &description)
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage();
SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();
pkt->command_name = "_error";
pkt->props->set(StatusLevel, new SrsAmf0String(StatusLevelError));
pkt->props->set(StatusCode, new SrsAmf0String(StatusCodeConnectRejected));
pkt->props->set(StatusDescription, new SrsAmf0String(description.c_str()));
//pkt->props->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send connect app response rejected message failed. ret=%d", ret);
return ret;
}
srs_info("send connect app response rejected message success.");
return ret;
} }
int SrsRtmp::on_bw_done() int SrsRtmp::on_bw_done()
@ -1029,7 +1071,60 @@ int SrsRtmp::start_flash_publish(int stream_id)
srs_info("flash publish success."); srs_info("flash publish success.");
return ret; return ret;
}
int SrsRtmp::start_bandwidth_check(int max_play_kbps, int max_pub_kbps)
{
int ret = ERROR_SUCCESS;
int play_duration_ms = 3000;
int play_interval_ms = 0;
int play_actual_duration_ms = 0;
int play_bytes = 0;
int publish_duration_ms = 3000;
int publish_interval_ms = 0;
int publish_actual_duration_ms = 0;
int publish_bytes = 0;
int64_t start_time = srs_get_system_time_ms();
if((ret = bandwidth_check_play(play_duration_ms, play_interval_ms,
play_actual_duration_ms, play_bytes, max_play_kbps) != ERROR_SUCCESS)
|| (ret = bandwidth_check_publish(publish_duration_ms, publish_interval_ms,
publish_actual_duration_ms, publish_bytes, max_pub_kbps)) != ERROR_SUCCESS)
{
srs_error("band width check failed. ret = %d", ret);
return ret;
}
int64_t end_time = srs_get_system_time_ms();
int play_kbps = play_bytes * 8 / play_actual_duration_ms;
int publish_kbps = publish_bytes * 8 / publish_actual_duration_ms;
// send finished msg
SrsCommonMessage* finish_msg = new SrsCommonMessage();
SrsOnStatusCallPacket* finish_pkt = new SrsOnStatusCallPacket;
finish_pkt->command_name = SRS_BW_CHECK_FINISHED;
finish_pkt->data->set("code", new SrsAmf0Number(0));
finish_pkt->data->set("start_time", new SrsAmf0Number(start_time));
finish_pkt->data->set("end_time", new SrsAmf0Number(end_time));
finish_pkt->data->set("play_kbps", new SrsAmf0Number(play_kbps));
finish_pkt->data->set("publish_kbps", new SrsAmf0Number(publish_kbps));
finish_pkt->data->set("play_bytes", new SrsAmf0Number(play_bytes));
finish_pkt->data->set("play_time", new SrsAmf0Number(play_actual_duration_ms));
finish_pkt->data->set("publish_bytes", new SrsAmf0Number(publish_bytes));
finish_pkt->data->set("publish_time", new SrsAmf0Number(publish_actual_duration_ms));
finish_msg->set_packet(finish_pkt, 0);
if ((ret = protocol->send_message(finish_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check finish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check finished.");
return ret;
} }
int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
@ -1120,6 +1215,221 @@ int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType&
type = SrsClientFlashPublish; type = SrsClientFlashPublish;
stream_name = req->stream_name; stream_name = req->stream_name;
return ret; return ret;
}
int SrsRtmp::bandwidth_check_play(int duration_ms, int interval_ms, int &actual_duration_ms,
int &play_bytes, int max_play_kbps)
{
int ret = ERROR_SUCCESS;
// send start play command to client
SrsCommonMessage* start_play_msg = new SrsCommonMessage();
SrsOnStatusCallPacket* start_play_packet = new SrsOnStatusCallPacket;
start_play_packet->command_name = SRS_BW_CHECK_START_PLAY;
start_play_packet->data->set("duration_ms", new SrsAmf0Number(duration_ms));
start_play_packet->data->set("interval_ms", new SrsAmf0Number(interval_ms));
start_play_msg->set_packet(start_play_packet, 0);
if ((ret = protocol->send_message(start_play_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start play message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check begin.");
// recv client's starting play response
while (true) {
SrsCommonMessage* msg = 0;
if( (ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
srs_error("recv client's starting play response failed. ret= %d", ret);
return ret;
}
msg->decode_packet(protocol);
SrsOnStatusCallPacket* pkt = dynamic_cast<SrsOnStatusCallPacket*>(msg->get_packet());
if(pkt && (pkt->command_name == SRS_BW_CHECK_STARTING_PLAY))
break;
}
srs_trace("BW check recv play begin response.");
// send play data to client
int64_t current_Time = srs_get_system_time_ms();
int size = 1024*4; // 32KB
char random_data[size];
memset(random_data, 0x01, size);
int64_t last_time = current_Time;
int interval = 0;
while ( (srs_get_system_time_ms() - current_Time) < duration_ms ){
st_usleep(interval);
SrsCommonMessage* msg = new SrsCommonMessage;
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket;
pkt->command_name = SRS_BW_CHECK_PLAYING;
for(int i = 0; i < 10; ++i)
{
char buf[32];
sprintf(buf, "%d", i);
pkt->data->set(buf, new SrsAmf0String(random_data));
}
msg->set_packet(pkt, 0);
play_bytes += pkt->get_payload_length();
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
if((srs_get_system_time_ms() - last_time) > 5){ // check kbps every 5 ms;
int kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_Time);
if(kbps > max_play_kbps){
interval += 1000*3; // 2 ms
} else {
interval -= 1000*3;
if(interval < 0)
interval = 0;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_Time;
srs_trace("BW check send play bytes over.");
// notify client to stop play
SrsCommonMessage* stop_play_msg = new SrsCommonMessage;
SrsOnStatusCallPacket* stop_play_pkt = new SrsOnStatusCallPacket;
stop_play_pkt->command_name = SRS_BW_CHECK_STOP_PLAY;
stop_play_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
stop_play_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
stop_play_pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms));
stop_play_pkt->data->set("bytes_delta", new SrsAmf0Number(play_bytes));
stop_play_msg->set_packet(stop_play_pkt, 0);
if ((ret = protocol->send_message(stop_play_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop play message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check stop play bytes.");
// recv client's stop play response.
while (true) {
SrsCommonMessage* msg = 0;
if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
srs_error("recv client's stop play response failed. ret = %d", ret);
return ret;
}
msg->decode_packet(protocol);
SrsOnStatusCallPacket* pkt = dynamic_cast<SrsOnStatusCallPacket*>(msg->get_packet());
if(pkt && (pkt->command_name == SRS_BW_CHECK_STOPPED_PLAY))
break;
}
srs_trace("BW check recv stop play response.");
return ret;
}
int SrsRtmp::bandwidth_check_publish(int duration_ms, int interval_ms, int &actual_duration_ms,
int &publish_bytes, int max_pub_kbps)
{
int ret = ERROR_SUCCESS;
// notify client to start publish
SrsCommonMessage* start_publish_msg = new SrsCommonMessage;
SrsOnStatusCallPacket* start_publish_pkt = new SrsOnStatusCallPacket;
start_publish_pkt->command_name = SRS_BW_CHECK_START_PUBLISH;
start_publish_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
start_publish_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
start_publish_msg->set_packet(start_publish_pkt, 0);
if ((ret = protocol->send_message(start_publish_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start publish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check publish begin.");
// read client's notification of starting publish
while (true) {
SrsCommonMessage* msg = 0;
if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
srs_error("recv client's notification of starting publish failed. ret = %d", ret);
return ret;
}
msg->decode_packet(protocol);
SrsOnStatusCallPacket* pkt = dynamic_cast<SrsOnStatusCallPacket*>(msg->get_packet());
if(pkt && (pkt->command_name == SRS_BW_CHECK_STARTING_PUBLISH))
break;
}
srs_trace("BW check recv publish begin response.");
// recv publish msgs until @duration_ms ms
int64_t current_time = srs_get_system_time_ms();
int64_t last_time = current_time;
int interval = 0;
while( (srs_get_system_time_ms() - current_time) < duration_ms )
{
st_usleep(interval);
SrsCommonMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
publish_bytes += msg->header.payload_length;
if((srs_get_system_time_ms() - last_time) > 5){ // check kbps every 5 ms;
int kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time);
if(kbps > max_pub_kbps){
interval += 1000*3; // 2 ms
} else {
interval -= 1000*3;
if(interval < 0)
interval = 0;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_time;
srs_trace("BW check recv publish data over.");
// notify client to stop publish
SrsCommonMessage* stop_publish_msg = new SrsCommonMessage;
SrsOnStatusCallPacket* stop_publish_pkt = new SrsOnStatusCallPacket;
stop_publish_pkt->command_name = SRS_BW_CHECK_STOP_PUBLISH;
stop_publish_pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
stop_publish_pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
stop_publish_pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms));
stop_publish_pkt->data->set("bytes_delta", new SrsAmf0Number(publish_bytes));
stop_publish_msg->set_packet(stop_publish_pkt, 0);
if ((ret = protocol->send_message(stop_publish_msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop publish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check stop pulish.");
// recv left msg
while (true) {
if((ret = st_netfd_poll(stfd, POLLIN, 1000*500)) == ERROR_SUCCESS)
{
SrsCommonMessage* msg = 0;
if((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS)
{
srs_error("recv client's left msg failed, ret = %d", ret);
return ret;
}
} else {
ret = ERROR_SUCCESS;
break;
}
}
return ret;
} }

23
trunk/src/core/srs_core_rtmp.hpp Normal file → Executable file
View file

@ -2,6 +2,7 @@
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2013 winlin Copyright (c) 2013 winlin
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in
@ -64,6 +65,7 @@ struct SrsRequest
std::string port; std::string port;
std::string app; std::string app;
std::string stream; std::string stream;
std::string bw_key;
SrsRequest(); SrsRequest();
virtual ~SrsRequest(); virtual ~SrsRequest();
@ -97,7 +99,7 @@ enum SrsClientType
SrsClientUnknown, SrsClientUnknown,
SrsClientPlay, SrsClientPlay,
SrsClientFMLEPublish, SrsClientFMLEPublish,
SrsClientFlashPublish, SrsClientFlashPublish
}; };
/** /**
@ -122,10 +124,10 @@ public:
virtual int send_message(ISrsMessage* msg); virtual int send_message(ISrsMessage* msg);
public: public:
virtual int handshake(); virtual int handshake();
virtual int connect_app(std::string app, std::string tc_url); virtual int connect_app(const std::string &app, const std::string &tc_url);
virtual int create_stream(int& stream_id); virtual int create_stream(int& stream_id);
virtual int play(std::string stream, int stream_id); virtual int play(const std::string &stream, int stream_id);
virtual int publish(std::string stream, int stream_id); virtual int publish(const std::string &stream, int stream_id);
}; };
/** /**
@ -161,7 +163,8 @@ public:
* using the Limit type field. * using the Limit type field.
*/ */
virtual int set_peer_bandwidth(int bandwidth, int type); virtual int set_peer_bandwidth(int bandwidth, int type);
virtual int response_connect_app(SrsRequest* req); virtual int response_connect_app(SrsRequest* req, const char *ip = 0);
virtual int response_connect_reject(SrsRequest* req, const std::string& description);
virtual int on_bw_done(); virtual int on_bw_done();
/** /**
* recv some message to identify the client. * recv some message to identify the client.
@ -212,10 +215,18 @@ public:
* onStatus(NetStream.Publish.Start) * onStatus(NetStream.Publish.Start)
*/ */
virtual int start_flash_publish(int stream_id); virtual int start_flash_publish(int stream_id);
/**
* used to process band width check from client.
*/
virtual int start_bandwidth_check(int max_play_kbps, int max_pub_kbps);
private: private:
virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name); virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name);
virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name); virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name);
virtual int bandwidth_check_play(int duration_ms, int interval_ms, int& actual_duration_ms, int& play_bytes, int max_play_kbps);
virtual int bandwidth_check_publish(int duration_ms, int interval_ms, int& actual_duration_ms, int& publish_bytes, int max_pub_kbps);
}; };
#endif #endif

6
trunk/src/core/srs_core_server.cpp Normal file → Executable file
View file

@ -144,14 +144,14 @@ void SrsListener::listen_cycle()
// ignore error. // ignore error.
srs_warn("ignore accept thread stoppped for accept client error"); srs_warn("ignore accept thread stoppped for accept client error");
continue; continue;
} }
srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) { if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret); srs_warn("accept client error. ret=%d", ret);
continue; continue;
} }
srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
} }
} }
@ -250,7 +250,7 @@ int SrsServer::cycle()
// the deamon thread, update the time cache // the deamon thread, update the time cache
while (true) { while (true) {
st_usleep(SRS_TIME_RESOLUTION_MS * 1000); st_usleep(SRS_TIME_RESOLUTION_MS * 1000);
srs_update_system_time_ms(); srs_update_system_time_ms();
if (signal_reload) { if (signal_reload) {
signal_reload = false; signal_reload = false;

2
trunk/src/core/srs_core_source.cpp Normal file → Executable file
View file

@ -344,7 +344,7 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
std::map<std::string, SrsSource*> SrsSource::pool; std::map<std::string, SrsSource*> SrsSource::pool;
SrsSource* SrsSource::find(std::string stream_url) SrsSource* SrsSource::find(const std::string &stream_url)
{ {
if (pool.find(stream_url) == pool.end()) { if (pool.find(stream_url) == pool.end()) {
pool[stream_url] = new SrsSource(stream_url); pool[stream_url] = new SrsSource(stream_url);