diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 32ccf8e12..822d9d35e 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -781,9 +781,8 @@ srs_error_t SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa return srs_api_response(w, r, obj->dumps()); } -SrsGoApiSdp::SrsGoApiSdp(SrsServer* svr, SrsRtcServer* rtc_svr) +SrsGoApiSdp::SrsGoApiSdp(SrsRtcServer* rtc_svr) { - server = svr; rtc_server = rtc_svr; } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index c4cf02a6c..249ae7c31 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -168,10 +168,9 @@ public: class SrsGoApiSdp : public ISrsHttpHandler { private: - SrsServer* server; SrsRtcServer* rtc_server; public: - SrsGoApiSdp(SrsServer* svr, SrsRtcServer* rtc_svr); + SrsGoApiSdp(SrsRtcServer* rtc_svr); virtual ~SrsGoApiSdp(); public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index cd8147327..9094de4f2 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -102,6 +102,11 @@ void SrsServerAdapter::stop() { } +SrsServer* SrsServerAdapter::instance() +{ + return srs; +} + SrsHybridServer::SrsHybridServer() { } @@ -181,5 +186,15 @@ void SrsHybridServer::stop() } } +SrsServerAdapter* SrsHybridServer::srs() +{ + for (vector::iterator it = servers.begin(); it != servers.end(); ++it) { + if (dynamic_cast(*it)) { + return dynamic_cast(*it); + } + } + return NULL; +} + SrsHybridServer* _srs_hybrid = new SrsHybridServer(); diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index 383456660..0f995afe4 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -57,6 +57,8 @@ public: virtual srs_error_t initialize(); virtual srs_error_t run(); virtual void stop(); +public: + virtual SrsServer* instance(); }; // The hybrid server manager. @@ -73,6 +75,8 @@ public: virtual srs_error_t initialize(); virtual srs_error_t run(); virtual void stop(); +public: + virtual SrsServerAdapter* srs(); }; extern SrsHybridServer* _srs_hybrid; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 60628b666..a5a71440e 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -50,6 +50,8 @@ using namespace std; #include #include #include +#include +#include static bool is_stun(const uint8_t* data, const int size) { @@ -654,7 +656,9 @@ srs_error_t SrsRtcSenderThread::cycle() SrsSource* source = NULL; - if (_srs_sources->fetch_or_create(&rtc_session->request, rtc_session->server, &source) != srs_success) { + // TODO: FIXME: Should refactor it, directly use http server as handler. + ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); + if (_srs_sources->fetch_or_create(&rtc_session->request, handler, &source) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } @@ -716,9 +720,8 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int } } -SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) +SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) { - server = svr; rtc_server = rtc_svr; session_state = INIT; dtls_session = NULL; @@ -766,8 +769,11 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* void SrsRtcSession::check_source() { + // TODO: FIXME: Check return error. if (source == NULL) { - _srs_sources->fetch_or_create(&request, server, &source); + // TODO: FIXME: Should refactor it, directly use http server as handler. + ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); + _srs_sources->fetch_or_create(&request, handler, &source); } } @@ -1173,13 +1179,15 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) return err; } -SrsRtcServer::SrsRtcServer(SrsServer* svr) +SrsRtcServer::SrsRtcServer() { - server = svr; + listener = NULL; } SrsRtcServer::~SrsRtcServer() { + srs_freep(listener); + rttrd->stop(); srs_freep(rttrd); } @@ -1196,6 +1204,33 @@ srs_error_t SrsRtcServer::initialize() return err; } +srs_error_t SrsRtcServer::listen_rtc() +{ + srs_error_t err = srs_success; + + if (!_srs_config->get_rtc_enabled()) { + return err; + } + + int port = _srs_config->get_rtc_listen(); + if (port <= 0) { + return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port); + } + + string ip = srs_any_address_for_listener(); + + srs_freep(listener); + listener = new SrsUdpMuxListener(this, ip, port); + + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); + } + + srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd()); + + return err; +} + srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) { if (is_stun(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { @@ -1223,7 +1258,7 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const Srs } int cid = _srs_context->get_id(); - SrsRtcSession* session = new SrsRtcSession(server, this, req, username, cid); + SrsRtcSession* session = new SrsRtcSession(this, req, username, cid); map_username_session.insert(make_pair(username, session)); local_sdp.set_ice_ufrag(local_ufrag); @@ -1406,3 +1441,46 @@ srs_error_t SrsRtcTimerThread::cycle() rtc_server->check_and_clean_timeout_session(); } } + +RtcServerAdapter::RtcServerAdapter() +{ + rtc = new SrsRtcServer(); +} + +RtcServerAdapter::~RtcServerAdapter() +{ + srs_freep(rtc); +} + +srs_error_t RtcServerAdapter::initialize() +{ + srs_error_t err = srs_success; + + if ((err = rtc->initialize()) != srs_success) { + return srs_error_wrap(err, "rtc server initialize"); + } + + return err; +} + +srs_error_t RtcServerAdapter::run() +{ + srs_error_t err = srs_success; + + if ((err = rtc->listen_rtc()) != srs_success) { + return srs_error_wrap(err, "rtc server initialize"); + } + + // TODO: FIXME: Fetch api from hybrid manager. + SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server(); + if ((err = http_api_mux->handle("/api/v1/sdp/", new SrsGoApiSdp(rtc))) != srs_success) { + return srs_error_wrap(err, "handle sdp"); + } + + return err; +} + +void RtcServerAdapter::stop() +{ +} + diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 57914dd7d..0388ba31b 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -38,7 +39,6 @@ #include class SrsUdpMuxSocket; -class SrsServer; class SrsConsumer; class SrsStunPacket; class SrsRtcServer; @@ -184,7 +184,6 @@ class SrsRtcSession { friend class SrsRtcSenderThread; private: - SrsServer* server; SrsRtcServer* rtc_server; SrsSdp remote_sdp; SrsSdp local_sdp; @@ -201,7 +200,7 @@ public: SrsRequest request; SrsSource* source; public: - SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id); + SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id); virtual ~SrsRtcSession(); public: SrsSdp* get_local_sdp() { return &local_sdp; } @@ -240,7 +239,7 @@ private: srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); }; -// XXX: is there any other timer thread? +// TODO: FIXME: is there any other timer thread? class SrsRtcTimerThread : public ISrsCoroutineHandler { protected: @@ -264,17 +263,21 @@ public: class SrsRtcServer : public ISrsUdpMuxHandler { private: - SrsServer* server; + SrsUdpMuxListener* listener; SrsRtcTimerThread* rttrd; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map map_id_session; // key: peerip(ip + ":" + port) public: - SrsRtcServer(SrsServer* svr); + SrsRtcServer(); virtual ~SrsRtcServer(); public: virtual srs_error_t initialize(); + // TODO: FIXME: Support gracefully quit. + // TODO: FIXME: Support reload. + virtual srs_error_t listen_rtc(); + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt); SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp); @@ -289,5 +292,19 @@ private: SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id); }; +// The RTC server adapter. +class RtcServerAdapter : public ISrsHybridServer +{ +private: + SrsRtcServer* rtc; +public: + RtcServerAdapter(); + virtual ~RtcServerAdapter(); +public: + virtual srs_error_t initialize(); + virtual srs_error_t run(); + virtual void stop(); +}; + #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 5e3294c0b..d216ccd1d 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -110,8 +110,6 @@ std::string srs_listener_type2string(SrsListenerType type) return "RTSP"; case SrsListenerFlv: return "HTTP-FLV"; - case SrsListenerRtc: - return "RTC"; default: return "UNKONWN"; } @@ -342,45 +340,6 @@ SrsUdpCasterListener::~SrsUdpCasterListener() srs_freep(caster); } -SrsRtcListener::SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t) : SrsListener(svr, t) -{ - srs_assert(type == SrsListenerRtc); - rtc = rtc_svr; -} - -SrsRtcListener::~SrsRtcListener() -{ -} - -srs_error_t SrsRtcListener::listen(std::string i, int p) -{ - srs_error_t err = srs_success; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerRtc); - - ip = i; - port = p; - - srs_freep(listener); - listener = new SrsUdpMuxListener(rtc, ip, port); - - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); - } - - // notify the handler the fd changed. - if ((err = rtc->on_stfd_change(listener->stfd())) != srs_success) { - return srs_error_wrap(err, "notify fd change failed"); - } - - string v = srs_listener_type2string(type); - srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - - return err; -} - SrsSignalManager* SrsSignalManager::instance = NULL; SrsSignalManager::SrsSignalManager(SrsServer* s) @@ -672,7 +631,6 @@ SrsServer::SrsServer() // new these objects in initialize instead. http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); - rtc_server = new SrsRtcServer(this); http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); } @@ -797,10 +755,6 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) if ((err = http_server->initialize()) != srs_success) { return srs_error_wrap(err, "http server initialize"); } - - if ((err = rtc_server->initialize()) != srs_success) { - return srs_error_wrap(err, "rtc server initialize"); - } return err; } @@ -923,10 +877,6 @@ srs_error_t SrsServer::listen() if ((err = listen_stream_caster()) != srs_success) { return srs_error_wrap(err, "stream caster listen"); } - - if ((err = listen_rtc()) != srs_success) { - return srs_error_wrap(err, "rtc listen"); - } if ((err = conn_manager->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); @@ -989,9 +939,6 @@ srs_error_t SrsServer::http_handle() if ((err = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) { return srs_error_wrap(err, "handle streams"); } - if ((err = http_api_mux->handle("/api/v1/sdp/", new SrsGoApiSdp(this, rtc_server))) != srs_success) { - return srs_error_wrap(err, "handle sdp"); - } if ((err = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) { return srs_error_wrap(err, "handle clients"); } @@ -1400,35 +1347,6 @@ srs_error_t SrsServer::listen_stream_caster() return err; } -srs_error_t SrsServer::listen_rtc() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerRtc); - - if (!_srs_config->get_rtc_enabled()) { - return err; - } - - SrsListener* listener = NULL; - - listener = new SrsRtcListener(this, rtc_server, SrsListenerRtc); - srs_assert(listener != NULL); - - listeners.push_back(listener); - - int port = _srs_config->get_rtc_listen(); - if (port <= 0) { - return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port); - } - - if ((err = listener->listen(srs_any_address_for_listener(), port)) != srs_success) { - return srs_error_wrap(err, "listen at %d", port); - } - - return err; -} - void SrsServer::close_listeners(SrsListenerType type) { std::vector::iterator it; @@ -1445,23 +1363,6 @@ void SrsServer::close_listeners(SrsListenerType type) } } -SrsListener* SrsServer::find_listener(SrsListenerType type) -{ - std::vector::iterator it; - for (it = listeners.begin(); it != listeners.end();) { - SrsListener* listener = *it; - - if (listener->listen_type() != type) { - ++it; - continue; - } - - return *it; - } - - return NULL; -} - void SrsServer::resample_kbps() { SrsStatistic* stat = SrsStatistic::instance(); @@ -1510,6 +1411,11 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) return err; } +SrsHttpServeMux* SrsServer::api_server() +{ + return http_api_mux; +} + srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 282e058a2..b702eb49e 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -41,7 +41,6 @@ class SrsServer; class SrsConnection; class SrsHttpServeMux; class SrsHttpServer; -class SrsRtcServer; class SrsIngester; class SrsHttpHeartbeat; class SrsKbps; @@ -70,8 +69,6 @@ enum SrsListenerType SrsListenerRtsp = 4, // TCP stream, FLV stream over HTTP. SrsListenerFlv = 5, - // UDP remux, rtp over udp - SrsListenerRtc = 6, }; // A common tcp listener, for RTMP/HTTP server. @@ -159,19 +156,6 @@ public: virtual ~SrsUdpCasterListener(); }; -// A UDP listener, for udp remux rtc server -class SrsRtcListener : public SrsListener -{ -protected: - SrsUdpMuxListener* listener; - ISrsUdpMuxHandler* rtc; -public: - SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t); - virtual ~SrsRtcListener(); -public: - virtual srs_error_t listen(std::string i, int p); -}; - // Convert signal to io, // @see: st-1.9/docs/notes.html class SrsSignalManager : public ISrsCoroutineHandler @@ -241,7 +225,6 @@ private: // TODO: FIXME: rename to http_api SrsHttpServeMux* http_api_mux; SrsHttpServer* http_server; - SrsRtcServer* rtc_server; SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; SrsCoroutineManager* conn_manager; @@ -320,7 +303,6 @@ private: virtual srs_error_t listen_http_api(); virtual srs_error_t listen_http_stream(); virtual srs_error_t listen_stream_caster(); - virtual srs_error_t listen_rtc(); // Close the listeners for specified type, // Remove the listen object from manager. virtual void close_listeners(SrsListenerType type); @@ -333,6 +315,8 @@ public: // for instance RTMP connection to serve client. // @param stfd, the client fd in st boxed, the underlayer fd. virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd); + // TODO: FIXME: Fetch from hybrid server manager. + virtual SrsHttpServeMux* api_server(); private: virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn); // Interface IConnectionManager @@ -356,8 +340,6 @@ public: public: virtual srs_error_t on_publish(SrsSource* s, SrsRequest* r); virtual void on_unpublish(SrsSource* s, SrsRequest* r); -// listeners commuction - virtual SrsListener* find_listener(SrsListenerType type); }; #endif diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 0778c72f1..1bec4c0ad 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -50,6 +50,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_SRT #include @@ -441,11 +442,15 @@ srs_error_t run_hybrid_server() { srs_error_t err = srs_success; + // Create servers and register them. _srs_hybrid->register_server(new SrsServerAdapter()); + #ifdef SRS_AUTO_SRT _srs_hybrid->register_server(new SrtServerAdapter()); #endif + _srs_hybrid->register_server(new RtcServerAdapter()); + // Do some system initialize. if ((err = _srs_hybrid->initialize()) != srs_success) { return srs_error_wrap(err, "hybrid initialize");