diff --git a/trunk/src/app/srs_app_kbps.cpp b/trunk/src/app/srs_app_kbps.cpp index c7fd53e0b..39e3fd149 100644 --- a/trunk/src/app/srs_app_kbps.cpp +++ b/trunk/src/app/srs_app_kbps.cpp @@ -77,22 +77,22 @@ void SrsKbpsSlice::sample() } if (now - sample_30s.time > 30 * 1000) { - sample_30s.kbps = (total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time); + sample_30s.kbps = (int)((total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time)); sample_30s.time = now; sample_30s.bytes = total_bytes; } if (now - sample_1m.time > 60 * 1000) { - sample_1m.kbps = (total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time); + sample_1m.kbps = (int)((total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time)); sample_1m.time = now; sample_1m.bytes = total_bytes; } if (now - sample_5m.time > 300 * 1000) { - sample_5m.kbps = (total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time); + sample_5m.kbps = (int)((total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time)); sample_5m.time = now; sample_5m.bytes = total_bytes; } if (now - sample_60m.time > 3600 * 1000) { - sample_60m.kbps = (total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time); + sample_60m.kbps = (int)((total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time)); sample_60m.time = now; sample_60m.bytes = total_bytes; } @@ -160,7 +160,7 @@ int SrsKbps::get_send_kbps() return 0; } int64_t bytes = get_send_bytes(); - return bytes * 8 / duration; + return (int)(bytes * 8 / duration); } int SrsKbps::get_recv_kbps() @@ -170,7 +170,7 @@ int SrsKbps::get_recv_kbps() return 0; } int64_t bytes = get_recv_bytes(); - return bytes * 8 / duration; + return (int)(bytes * 8 / duration); } int SrsKbps::get_send_kbps_30s() diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index fb370d708..915c95c23 100644 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -54,6 +54,11 @@ ISrsUdpHandler::~ISrsUdpHandler() { } +int ISrsUdpHandler::on_stfd_change(st_netfd_t /*fd*/) +{ + return ERROR_SUCCESS; +} + ISrsTcpHandler::ISrsTcpHandler() { } @@ -69,7 +74,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) port = p; _fd = -1; - stfd = NULL; + _stfd = NULL; nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; @@ -80,7 +85,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) SrsUdpListener::~SrsUdpListener() { // close the stfd to trigger thread to interrupted. - srs_close_stfd(stfd); + srs_close_stfd(_stfd); pthread->stop(); srs_freep(pthread); @@ -97,6 +102,11 @@ int SrsUdpListener::fd() return _fd; } +st_netfd_t SrsUdpListener::stfd() +{ + return _stfd; +} + int SrsUdpListener::listen() { int ret = ERROR_SUCCESS; @@ -127,7 +137,7 @@ int SrsUdpListener::listen() } srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); - if ((stfd = st_netfd_open_socket(_fd)) == NULL){ + if ((_stfd = st_netfd_open_socket(_fd)) == NULL){ ret = ERROR_ST_OPEN_SOCKET; srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; @@ -153,7 +163,7 @@ int SrsUdpListener::cycle() int nb_from = sizeof(sockaddr_in); int nread = 0; - if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { + if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { srs_warn("ignore recv udp packet failed, nread=%d", nread); continue; } @@ -178,7 +188,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) port = p; _fd = -1; - stfd = NULL; + _stfd = NULL; pthread = new SrsThread("tcp", this, 0, true); } @@ -186,7 +196,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) SrsTcpListener::~SrsTcpListener() { // close the stfd to trigger thread to interrupted. - srs_close_stfd(stfd); + srs_close_stfd(_stfd); pthread->stop(); srs_freep(pthread); @@ -238,7 +248,7 @@ int SrsTcpListener::listen() } srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); - if ((stfd = st_netfd_open_socket(_fd)) == NULL){ + if ((_stfd = st_netfd_open_socket(_fd)) == NULL){ ret = ERROR_ST_OPEN_SOCKET; srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; @@ -258,7 +268,7 @@ int SrsTcpListener::cycle() { int ret = ERROR_SUCCESS; - st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); + st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); if(client_stfd == NULL){ // ignore error. diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 679bf1bda..371b5acba 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -45,6 +45,12 @@ class ISrsUdpHandler public: ISrsUdpHandler(); virtual ~ISrsUdpHandler(); +public: + /** + * when fd changed, for instance, reload the listen port, + * notify the handler and user can do something. + */ + virtual int on_stfd_change(st_netfd_t fd); public: /** * when udp listener got a udp packet, notice server to process it. @@ -80,7 +86,7 @@ class SrsUdpListener : public ISrsThreadHandler { private: int _fd; - st_netfd_t stfd; + st_netfd_t _stfd; SrsThread* pthread; private: char* buf; @@ -94,6 +100,7 @@ public: virtual ~SrsUdpListener(); public: virtual int fd(); + virtual st_netfd_t stfd(); public: virtual int listen(); // interface ISrsThreadHandler. @@ -108,7 +115,7 @@ class SrsTcpListener : public ISrsThreadHandler { private: int _fd; - st_netfd_t stfd; + st_netfd_t _stfd; SrsThread* pthread; private: ISrsTcpHandler* handler; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d91b5101d..7e2c0809a 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -330,6 +330,12 @@ int SrsUdpStreamListener::listen(string i, int p) srs_info("listen thread cid=%d, current_cid=%d, " "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port); + + // notify the handler the fd changed. + if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) { + srs_error("notify handler fd changed. ret=%d", ret); + return ret; + } srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd()); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d4c4d7823..7e6dfafe3 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -45,6 +45,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 40 @@ -759,6 +760,20 @@ SrsSource* SrsSource::fetch(SrsRequest* r) return source; } +SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream) +{ + SrsSource* source = NULL; + string stream_url = srs_generate_stream_url(vhost, app, stream); + + if (pool.find(stream_url) == pool.end()) { + return NULL; + } + + source = pool[stream_url]; + + return source; +} + void SrsSource::destroy() { std::map::iterator it; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4d4799f70..3c83e1163 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -407,6 +407,10 @@ public: */ static SrsSource* fetch(SrsRequest* r); /** + * get the exists source by stream info(vhost, app, stream), NULL when not exists. + */ + static SrsSource* fetch(std::string vhost, std::string app, std::string stream); + /** * when system exit, destroy the sources, * for gmc to analysis mem leaks. */ diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 59076db0f..275815917 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -195,6 +195,7 @@ int SrsStatistic::on_client(int id, SrsRequest* req) SrsStatisticClient* client = NULL; if (clients.find(id) == clients.end()) { client = new SrsStatisticClient(); + client->id = id; client->stream = stream; clients[id] = client; } else { diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index bfcdad6b6..11f3e35b6 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -255,7 +255,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026 #define ERROR_AVC_NALU_UEV 4027 #define ERROR_AAC_BYTES_INVALID 4028 -#define ERROR_HTTP_REQUEST_EOF 4029 +#define ERROR_HTTP_REQUEST_EOF 4029 /////////////////////////////////////////////////////// // user-define error. diff --git a/trunk/src/protocol/srs_rtmp_sdk.cpp b/trunk/src/protocol/srs_rtmp_sdk.cpp index 3ee7ee9bd..ba5757340 100644 --- a/trunk/src/protocol/srs_rtmp_sdk.cpp +++ b/trunk/src/protocol/srs_rtmp_sdk.cpp @@ -99,15 +99,7 @@ void SrsRequest::update_auth(SrsRequest* req) string SrsRequest::get_stream_url() { - std::string url = ""; - - url += vhost; - url += "/"; - url += app; - url += "/"; - url += stream; - - return url; + return srs_generate_stream_url(vhost, app, stream); } void SrsRequest::strip() diff --git a/trunk/src/protocol/srs_rtmp_utility.cpp b/trunk/src/protocol/srs_rtmp_utility.cpp index bb1ed35ef..7c892ad36 100644 --- a/trunk/src/protocol/srs_rtmp_utility.cpp +++ b/trunk/src/protocol/srs_rtmp_utility.cpp @@ -31,6 +31,7 @@ using namespace std; #include #include #include +#include void srs_discovery_tc_url( string tcUrl, @@ -346,3 +347,18 @@ int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, in return ret; } +std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream) +{ + std::string url = ""; + + if (SRS_CONSTS_RTMP_DEFAULT_VHOST != vhost){ + url += vhost; + } + url += "/"; + url += app; + url += "/"; + url += stream; + + return url; +} + diff --git a/trunk/src/protocol/srs_rtmp_utility.hpp b/trunk/src/protocol/srs_rtmp_utility.hpp index afa02eaa8..09e834d09 100644 --- a/trunk/src/protocol/srs_rtmp_utility.hpp +++ b/trunk/src/protocol/srs_rtmp_utility.hpp @@ -120,5 +120,8 @@ extern int srs_chunk_header_c3( */ extern int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, int stream_id, SrsSharedPtrMessage** ppmsg); +// get the stream identify, vhost/app/stream. +extern std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream); + #endif