1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

refactor bandwidth test, refactor the interface of bandwidth server object.

This commit is contained in:
winlin 2014-07-12 15:55:39 +08:00
parent 7f21520e9f
commit b1dd0218be
4 changed files with 45 additions and 37 deletions

View file

@ -90,10 +90,10 @@ public:
*/ */
int size; int size;
char* bytes; char* bytes;
public:
SrsCodecBuffer(); SrsCodecBuffer();
void append(void* data, int len); void append(void* data, int len);
public:
/** /**
* free the bytes, * free the bytes,
* user can invoke it to free the bytes, * user can invoke it to free the bytes,

View file

@ -39,29 +39,31 @@ using namespace std;
SrsBandwidth::SrsBandwidth() SrsBandwidth::SrsBandwidth()
{ {
_req = NULL;
_rtmp = NULL;
} }
SrsBandwidth::~SrsBandwidth() SrsBandwidth::~SrsBandwidth()
{ {
} }
int SrsBandwidth::bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmpServer* _rtmp) int SrsBandwidth::bandwidth_check(SrsRtmpServer* rtmp, SrsRequest* req, string local_ip)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
rtmp = _rtmp; _rtmp = rtmp;
req = _req; _req = req;
if (!_srs_config->get_bw_check_enabled(req->vhost)) { if (!_srs_config->get_bw_check_enabled(_req->vhost)) {
return ret; return ret;
} }
// validate the bandwidth check key // validate the bandwidth check key
std::string key = "key=" + _srs_config->get_bw_check_key(req->vhost); std::string key = "key=" + _srs_config->get_bw_check_key(_req->vhost);
if (req->tcUrl.find(key) == std::string::npos) { if (_req->tcUrl.find(key) == std::string::npos) {
ret = ERROR_SYSTEM_BANDWIDTH_KEY; ret = ERROR_SYSTEM_BANDWIDTH_KEY;
srs_error("check the vhost=%s %s failed, tcUrl=%s, ret=%d", srs_error("check the vhost=%s %s failed, tcUrl=%s, ret=%d",
req->vhost.c_str(), key.c_str(), req->tcUrl.c_str(), ret); _req->vhost.c_str(), key.c_str(), _req->tcUrl.c_str(), ret);
return ret; return ret;
} }
@ -70,7 +72,7 @@ int SrsBandwidth::bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmpServe
// if client request check in the window(specifeid by interval), // if client request check in the window(specifeid by interval),
// directly reject the request. // directly reject the request.
static int64_t last_check_time = 0; static int64_t last_check_time = 0;
int interval_ms = _srs_config->get_bw_check_interval_ms(req->vhost); int interval_ms = _srs_config->get_bw_check_interval_ms(_req->vhost);
int64_t time_now = srs_get_system_time_ms(); int64_t time_now = srs_get_system_time_ms();
// reject the connection in the interval window. // reject the connection in the interval window.
@ -80,15 +82,14 @@ int SrsBandwidth::bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmpServe
"last_check=%"PRId64", now=%"PRId64", interval=%d", "last_check=%"PRId64", now=%"PRId64", interval=%d",
last_check_time, time_now, interval_ms); last_check_time, time_now, interval_ms);
rtmp->response_connect_reject(req, "bandcheck rejected"); _rtmp->response_connect_reject(_req, "bandcheck rejected");
return ret; return ret;
} }
// accept and do bandwidth check. // accept and do bandwidth check.
last_check_time = time_now; last_check_time = time_now;
std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd)); if ((ret = _rtmp->response_connect_app(_req, local_ip.c_str())) != ERROR_SUCCESS) {
if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret); srs_error("response connect app failed. ret=%d", ret);
return ret; return ret;
} }
@ -110,7 +111,7 @@ int SrsBandwidth::do_bandwidth_check()
int publish_actual_duration_ms = 0; int publish_actual_duration_ms = 0;
int publish_bytes = 0; int publish_bytes = 0;
int limit_kbps = _srs_config->get_bw_check_limit_kbps(req->vhost); int limit_kbps = _srs_config->get_bw_check_limit_kbps(_req->vhost);
int64_t start_time = srs_get_system_time_ms(); int64_t start_time = srs_get_system_time_ms();
@ -135,7 +136,7 @@ int SrsBandwidth::do_bandwidth_check()
srs_trace("bandwidth check finished. start=%"PRId64"ms, end=%"PRId64"ms, " srs_trace("bandwidth check finished. start=%"PRId64"ms, end=%"PRId64"ms, "
"duartion=%dms, play=%dkbps, publish=%dkbps, tcUrl=%s, ret=%#x", "duartion=%dms, play=%dkbps, publish=%dkbps, tcUrl=%s, ret=%#x",
start_time, end_time, (int)(end_time - start_time), play_kbps, publish_kbps, start_time, end_time, (int)(end_time - start_time), play_kbps, publish_kbps,
req->tcUrl.c_str(), ret); _req->tcUrl.c_str(), ret);
// send finished msg // send finished msg
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_finish(); SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_finish();
@ -149,7 +150,7 @@ int SrsBandwidth::do_bandwidth_check()
pkt->data->set("publish_bytes", SrsAmf0Any::number(publish_bytes)); pkt->data->set("publish_bytes", SrsAmf0Any::number(publish_bytes));
pkt->data->set("publish_time", SrsAmf0Any::number(publish_actual_duration_ms)); pkt->data->set("publish_time", SrsAmf0Any::number(publish_actual_duration_ms));
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { if ((ret = _rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check finish message failed. ret=%d", ret); srs_error("send bandwidth check finish message failed. ret=%d", ret);
return ret; return ret;
} }
@ -158,7 +159,7 @@ int SrsBandwidth::do_bandwidth_check()
while (true) { while (true) {
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL; SrsBandwidthPacket* pkt = NULL;
if ((ret = rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) { if ((ret = _rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
// info level to ignore and return success. // info level to ignore and return success.
srs_info("expect final message failed. ret=%d", ret); srs_info("expect final message failed. ret=%d", ret);
return ERROR_SUCCESS; return ERROR_SUCCESS;
@ -191,7 +192,7 @@ int SrsBandwidth::check_play(
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms)); pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms)); pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { if ((ret = _rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start play message failed. ret=%d", ret); srs_error("send bandwidth check start play message failed. ret=%d", ret);
return ret; return ret;
} }
@ -202,7 +203,7 @@ int SrsBandwidth::check_play(
// recv client's starting play response // recv client's starting play response
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL; SrsBandwidthPacket* pkt = NULL;
if ((ret = rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) { if ((ret = _rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret); srs_error("expect bandwidth message failed. ret=%d", ret);
return ret; return ret;
} }
@ -224,7 +225,7 @@ int SrsBandwidth::check_play(
int interval = 0; int interval = 0;
int data_count = 1; int data_count = 1;
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) { while ((srs_get_system_time_ms() - current_time) < duration_ms) {
st_usleep(interval); st_usleep(interval);
// TODO: FIXME: use shared ptr message. // TODO: FIXME: use shared ptr message.
@ -234,15 +235,15 @@ int SrsBandwidth::check_play(
for (int i = 0; i < data_count; ++i) { for (int i = 0; i < data_count; ++i) {
std::stringstream seq; std::stringstream seq;
seq << i; seq << i;
std::string play_data = "SrS band check data from server's playing......"; std::string play_data = "SRS band check data from server's playing......";
pkt->data->set(seq.str(), SrsAmf0Any::str(play_data.c_str())); pkt->data->set(seq.str(), SrsAmf0Any::str(play_data.c_str()));
} }
data_count += 2; data_count += 2;
// get length from the rtmp protocol stack. // get length from the rtmp protocol stack.
play_bytes = rtmp->get_send_bytes(); play_bytes = _rtmp->get_send_bytes();
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { if ((ret = _rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret); srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret; return ret;
} }
@ -272,7 +273,7 @@ int SrsBandwidth::check_play(
pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms)); pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms));
pkt->data->set("bytes_delta", SrsAmf0Any::number(play_bytes)); pkt->data->set("bytes_delta", SrsAmf0Any::number(play_bytes));
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { if ((ret = _rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop play message failed. ret=%d", ret); srs_error("send bandwidth check stop play message failed. ret=%d", ret);
return ret; return ret;
} }
@ -283,7 +284,7 @@ int SrsBandwidth::check_play(
// recv client's stop play response. // recv client's stop play response.
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL; SrsBandwidthPacket* pkt = NULL;
if ((ret = rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) { if ((ret = _rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret); srs_error("expect bandwidth message failed. ret=%d", ret);
return ret; return ret;
} }
@ -313,7 +314,7 @@ int SrsBandwidth::check_publish(
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms)); pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms)); pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { if ((ret = _rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start publish message failed. ret=%d", ret); srs_error("send bandwidth check start publish message failed. ret=%d", ret);
return ret; return ret;
} }
@ -324,7 +325,7 @@ int SrsBandwidth::check_publish(
// read client's notification of starting publish // read client's notification of starting publish
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL; SrsBandwidthPacket* pkt = NULL;
if ((ret = rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) { if ((ret = _rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret); srs_error("expect bandwidth message failed. ret=%d", ret);
return ret; return ret;
} }
@ -344,13 +345,13 @@ int SrsBandwidth::check_publish(
st_usleep(0); st_usleep(0);
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { if ((ret = _rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret); srs_error("recv message failed. ret=%d", ret);
return ret; return ret;
} }
SrsAutoFree(SrsMessage, msg); SrsAutoFree(SrsMessage, msg);
publish_bytes = rtmp->get_recv_bytes(); publish_bytes = _rtmp->get_recv_bytes();
int kbps = 0; int kbps = 0;
while (true) { while (true) {
@ -375,7 +376,7 @@ int SrsBandwidth::check_publish(
pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms)); pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms));
pkt->data->set("bytes_delta", SrsAmf0Any::number(publish_bytes)); pkt->data->set("bytes_delta", SrsAmf0Any::number(publish_bytes));
if ((ret = rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) { if ((ret = _rtmp->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop publish message failed. ret=%d", ret); srs_error("send bandwidth check stop publish message failed. ret=%d", ret);
return ret; return ret;
} }
@ -392,7 +393,7 @@ int SrsBandwidth::check_publish(
// recv client's stop publish response. // recv client's stop publish response.
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL; SrsBandwidthPacket* pkt = NULL;
if ((ret = rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) { if ((ret = _rtmp->expect_message<SrsBandwidthPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret); srs_error("expect bandwidth message failed. ret=%d", ret);
return ret; return ret;
} }

View file

@ -29,6 +29,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include <srs_core.hpp> #include <srs_core.hpp>
#include <string>
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
class SrsRequest; class SrsRequest;
@ -73,16 +75,19 @@ class SrsRtmpServer;
class SrsBandwidth class SrsBandwidth
{ {
private: private:
SrsRequest* req; SrsRequest* _req;
SrsRtmpServer* rtmp; SrsRtmpServer* _rtmp;
public: public:
SrsBandwidth(); SrsBandwidth();
virtual ~SrsBandwidth(); virtual ~SrsBandwidth();
public: public:
/** /**
* do the bandwidth test. * do the bandwidth check.
* @param rtmp, server RTMP protocol object, send/recv RTMP packet to/from client.
* @param req, client request object, specifies the request info from client.
* @param local_ip, the ip of server which client connected at
*/ */
virtual int bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmpServer* _rtmp); virtual int bandwidth_check(SrsRtmpServer* rtmp, SrsRequest* req, std::string local_ip);
private: private:
/** /**
* used to process band width check from client. * used to process band width check from client.

View file

@ -211,12 +211,14 @@ int SrsRtmpConn::service_cycle()
} }
srs_verbose("set peer bandwidth success"); srs_verbose("set peer bandwidth success");
// get the ip which client connected.
std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd));
// do bandwidth test if connect to the vhost which is for bandwidth check. // do bandwidth test if connect to the vhost which is for bandwidth check.
if (_srs_config->get_bw_check_enabled(req->vhost)) { if (_srs_config->get_bw_check_enabled(req->vhost)) {
return bandwidth->bandwidth_test(req, stfd, rtmp); return bandwidth->bandwidth_check(rtmp, req, local_ip);
} }
std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd));
if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) { if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret); srs_error("response connect app failed. ret=%d", ret);
return ret; return ret;