From 7f02bfa3a4f73684d2a8f97cfb891461542e2e67 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 24 Jan 2015 15:36:11 +0800 Subject: [PATCH] for #250, the mpegts over udp stream caster framework. --- trunk/src/app/srs_app_server.cpp | 42 ++++++++++++++++++++++++++++++++ trunk/src/app/srs_app_server.hpp | 14 +++++++++++ 2 files changed, 56 insertions(+) diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 515ee9bc7..a45b78a04 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -51,6 +51,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // nginx also set to 512 #define SERVER_LISTEN_BACKLOG 512 +// sleep in ms for udp recv packet. +#define SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS 0 + +// set the max packet size. +#define SRS_UDP_MAX_PACKET_SIZE 65535 + // system interval in ms, // all resolution times should be times togother, // for example, system-interval is x=1s(1000ms), @@ -222,6 +228,8 @@ int SrsListener::cycle() SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type) { + nb_buf = SRS_UDP_MAX_PACKET_SIZE; + buf = new char[nb_buf]; } SrsUdpListener::~SrsUdpListener() @@ -294,6 +302,27 @@ int SrsUdpListener::cycle() // we just assert here for unknown stream caster. srs_assert(_type == SrsListenerMpegTsOverUdp); + for (;;) { + // TODO: FIXME: support ipv6, @see man 7 ipv6 + sockaddr_in from; + int nb_from = sizeof(sockaddr_in); + int nread = 0; + + if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { + srs_warn("ignore recv udp packet failed, nread=%d", nread); + continue; + } + + if ((ret = _server->on_udp_packet(_type, &from, buf, nread)) != ERROR_SUCCESS) { + srs_warn("handle udp packet failed. ret=%d", ret); + continue; + } + + if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { + st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); + } + } + // TODO: FIXME: recv udp packet. st_sleep(1); @@ -1112,6 +1141,19 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) return ret; } +int SrsServer::on_udp_packet(SrsListenerType type, sockaddr_in* from, char* buf, int nb_buf) +{ + int ret = ERROR_SUCCESS; + + std::string peer_ip = inet_ntoa(from->sin_addr); + int peer_port = ntohs(from->sin_port); + + // TODO: FIXME: implements it. + srs_warn("udp: drop %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); + + return ret; +} + int SrsServer::on_reload_listen() { return listen(); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 5333ad461..732d5093b 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -44,6 +44,7 @@ class SrsHttpServer; class SrsIngester; class SrsHttpHeartbeat; class SrsKbps; +class sockaddr_in; // listener type for server to identify the connection, // that is, use different type to process the connection. @@ -88,6 +89,9 @@ public: */ class SrsUdpListener : public SrsListener { +private: + char* buf; + int nb_buf; public: SrsUdpListener(SrsServer* server, SrsListenerType type); virtual ~SrsUdpListener(); @@ -252,6 +256,16 @@ public: * @param client_stfd, the client fd in st boxed, the underlayer fd. */ virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); + /** + * when udp listener got a udp packet, notice server to process it. + * @param type, the client type, used to create concrete connection, + * for instance RTMP connection to serve client. + * @param from, the udp packet from address. + * @param buf, the udp packet bytes, user should copy if need to use. + * @param nb_buf, the size of udp packet bytes. + * @remark user should never use the buf, for it's a shared memory bytes. + */ + virtual int on_udp_packet(SrsListenerType type, sockaddr_in* from, char* buf, int nb_buf); // interface ISrsThreadHandler. public: virtual int on_reload_listen();