1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #1638, #307, extract rtc server to hybrid manager.

This commit is contained in:
winlin 2020-03-17 17:56:37 +08:00
parent 4318d989a6
commit 2c4dc0fb3d
9 changed files with 141 additions and 136 deletions

View file

@ -781,9 +781,8 @@ srs_error_t SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return srs_api_response(w, r, obj->dumps());
}
SrsGoApiSdp::SrsGoApiSdp(SrsServer* svr, SrsRtcServer* rtc_svr)
SrsGoApiSdp::SrsGoApiSdp(SrsRtcServer* rtc_svr)
{
server = svr;
rtc_server = rtc_svr;
}

View file

@ -168,10 +168,9 @@ public:
class SrsGoApiSdp : public ISrsHttpHandler
{
private:
SrsServer* server;
SrsRtcServer* rtc_server;
public:
SrsGoApiSdp(SrsServer* svr, SrsRtcServer* rtc_svr);
SrsGoApiSdp(SrsRtcServer* rtc_svr);
virtual ~SrsGoApiSdp();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);

View file

@ -102,6 +102,11 @@ void SrsServerAdapter::stop()
{
}
SrsServer* SrsServerAdapter::instance()
{
return srs;
}
SrsHybridServer::SrsHybridServer()
{
}
@ -181,5 +186,15 @@ void SrsHybridServer::stop()
}
}
SrsServerAdapter* SrsHybridServer::srs()
{
for (vector<ISrsHybridServer*>::iterator it = servers.begin(); it != servers.end(); ++it) {
if (dynamic_cast<SrsServerAdapter*>(*it)) {
return dynamic_cast<SrsServerAdapter*>(*it);
}
}
return NULL;
}
SrsHybridServer* _srs_hybrid = new SrsHybridServer();

View file

@ -57,6 +57,8 @@ public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
public:
virtual SrsServer* instance();
};
// The hybrid server manager.
@ -73,6 +75,8 @@ public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
public:
virtual SrsServerAdapter* srs();
};
extern SrsHybridServer* _srs_hybrid;

View file

@ -50,6 +50,8 @@ using namespace std;
#include <srs_app_source.hpp>
#include <srs_app_server.hpp>
#include <srs_service_utility.hpp>
#include <srs_http_stack.hpp>
#include <srs_app_http_api.hpp>
static bool is_stun(const uint8_t* data, const int size)
{
@ -654,7 +656,9 @@ srs_error_t SrsRtcSenderThread::cycle()
SrsSource* source = NULL;
if (_srs_sources->fetch_or_create(&rtc_session->request, rtc_session->server, &source) != srs_success) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if (_srs_sources->fetch_or_create(&rtc_session->request, handler, &source) != srs_success) {
return srs_error_wrap(err, "rtc fetch source failed");
}
@ -716,9 +720,8 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int
}
}
SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
{
server = svr;
rtc_server = rtc_svr;
session_state = INIT;
dtls_session = NULL;
@ -766,8 +769,11 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket*
void SrsRtcSession::check_source()
{
// TODO: FIXME: Check return error.
if (source == NULL) {
_srs_sources->fetch_or_create(&request, server, &source);
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
_srs_sources->fetch_or_create(&request, handler, &source);
}
}
@ -1173,13 +1179,15 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
return err;
}
SrsRtcServer::SrsRtcServer(SrsServer* svr)
SrsRtcServer::SrsRtcServer()
{
server = svr;
listener = NULL;
}
SrsRtcServer::~SrsRtcServer()
{
srs_freep(listener);
rttrd->stop();
srs_freep(rttrd);
}
@ -1196,6 +1204,33 @@ srs_error_t SrsRtcServer::initialize()
return err;
}
srs_error_t SrsRtcServer::listen_rtc()
{
srs_error_t err = srs_success;
if (!_srs_config->get_rtc_enabled()) {
return err;
}
int port = _srs_config->get_rtc_listen();
if (port <= 0) {
return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port);
}
string ip = srs_any_address_for_listener();
srs_freep(listener);
listener = new SrsUdpMuxListener(this, ip, port);
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd());
return err;
}
srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt)
{
if (is_stun(reinterpret_cast<const uint8_t*>(udp_mux_skt->data()), udp_mux_skt->size())) {
@ -1223,7 +1258,7 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const Srs
}
int cid = _srs_context->get_id();
SrsRtcSession* session = new SrsRtcSession(server, this, req, username, cid);
SrsRtcSession* session = new SrsRtcSession(this, req, username, cid);
map_username_session.insert(make_pair(username, session));
local_sdp.set_ice_ufrag(local_ufrag);
@ -1406,3 +1441,46 @@ srs_error_t SrsRtcTimerThread::cycle()
rtc_server->check_and_clean_timeout_session();
}
}
RtcServerAdapter::RtcServerAdapter()
{
rtc = new SrsRtcServer();
}
RtcServerAdapter::~RtcServerAdapter()
{
srs_freep(rtc);
}
srs_error_t RtcServerAdapter::initialize()
{
srs_error_t err = srs_success;
if ((err = rtc->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc server initialize");
}
return err;
}
srs_error_t RtcServerAdapter::run()
{
srs_error_t err = srs_success;
if ((err = rtc->listen_rtc()) != srs_success) {
return srs_error_wrap(err, "rtc server initialize");
}
// TODO: FIXME: Fetch api from hybrid manager.
SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();
if ((err = http_api_mux->handle("/api/v1/sdp/", new SrsGoApiSdp(rtc))) != srs_success) {
return srs_error_wrap(err, "handle sdp");
}
return err;
}
void RtcServerAdapter::stop()
{
}

View file

@ -29,6 +29,7 @@
#include <srs_service_st.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_app_hybrid.hpp>
#include <string>
#include <map>
@ -38,7 +39,6 @@
#include <srtp2/srtp.h>
class SrsUdpMuxSocket;
class SrsServer;
class SrsConsumer;
class SrsStunPacket;
class SrsRtcServer;
@ -184,7 +184,6 @@ class SrsRtcSession
{
friend class SrsRtcSenderThread;
private:
SrsServer* server;
SrsRtcServer* rtc_server;
SrsSdp remote_sdp;
SrsSdp local_sdp;
@ -201,7 +200,7 @@ public:
SrsRequest request;
SrsSource* source;
public:
SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id);
SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id);
virtual ~SrsRtcSession();
public:
SrsSdp* get_local_sdp() { return &local_sdp; }
@ -240,7 +239,7 @@ private:
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
};
// XXX: is there any other timer thread?
// TODO: FIXME: is there any other timer thread?
class SrsRtcTimerThread : public ISrsCoroutineHandler
{
protected:
@ -264,17 +263,21 @@ public:
class SrsRtcServer : public ISrsUdpMuxHandler
{
private:
SrsServer* server;
SrsUdpMuxListener* listener;
SrsRtcTimerThread* rttrd;
private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
public:
SrsRtcServer(SrsServer* svr);
SrsRtcServer();
virtual ~SrsRtcServer();
public:
virtual srs_error_t initialize();
// TODO: FIXME: Support gracefully quit.
// TODO: FIXME: Support reload.
virtual srs_error_t listen_rtc();
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt);
SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
@ -289,5 +292,19 @@ private:
SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);
};
// The RTC server adapter.
class RtcServerAdapter : public ISrsHybridServer
{
private:
SrsRtcServer* rtc;
public:
RtcServerAdapter();
virtual ~RtcServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
};
#endif

View file

@ -110,8 +110,6 @@ std::string srs_listener_type2string(SrsListenerType type)
return "RTSP";
case SrsListenerFlv:
return "HTTP-FLV";
case SrsListenerRtc:
return "RTC";
default:
return "UNKONWN";
}
@ -342,45 +340,6 @@ SrsUdpCasterListener::~SrsUdpCasterListener()
srs_freep(caster);
}
SrsRtcListener::SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t) : SrsListener(svr, t)
{
srs_assert(type == SrsListenerRtc);
rtc = rtc_svr;
}
SrsRtcListener::~SrsRtcListener()
{
}
srs_error_t SrsRtcListener::listen(std::string i, int p)
{
srs_error_t err = srs_success;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerRtc);
ip = i;
port = p;
srs_freep(listener);
listener = new SrsUdpMuxListener(rtc, ip, port);
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
// notify the handler the fd changed.
if ((err = rtc->on_stfd_change(listener->stfd())) != srs_success) {
return srs_error_wrap(err, "notify fd change failed");
}
string v = srs_listener_type2string(type);
srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return err;
}
SrsSignalManager* SrsSignalManager::instance = NULL;
SrsSignalManager::SrsSignalManager(SrsServer* s)
@ -672,7 +631,6 @@ SrsServer::SrsServer()
// new these objects in initialize instead.
http_api_mux = new SrsHttpServeMux();
http_server = new SrsHttpServer(this);
rtc_server = new SrsRtcServer(this);
http_heartbeat = new SrsHttpHeartbeat();
ingester = new SrsIngester();
}
@ -797,10 +755,6 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch)
if ((err = http_server->initialize()) != srs_success) {
return srs_error_wrap(err, "http server initialize");
}
if ((err = rtc_server->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc server initialize");
}
return err;
}
@ -923,10 +877,6 @@ srs_error_t SrsServer::listen()
if ((err = listen_stream_caster()) != srs_success) {
return srs_error_wrap(err, "stream caster listen");
}
if ((err = listen_rtc()) != srs_success) {
return srs_error_wrap(err, "rtc listen");
}
if ((err = conn_manager->start()) != srs_success) {
return srs_error_wrap(err, "connection manager");
@ -989,9 +939,6 @@ srs_error_t SrsServer::http_handle()
if ((err = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) {
return srs_error_wrap(err, "handle streams");
}
if ((err = http_api_mux->handle("/api/v1/sdp/", new SrsGoApiSdp(this, rtc_server))) != srs_success) {
return srs_error_wrap(err, "handle sdp");
}
if ((err = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) {
return srs_error_wrap(err, "handle clients");
}
@ -1400,35 +1347,6 @@ srs_error_t SrsServer::listen_stream_caster()
return err;
}
srs_error_t SrsServer::listen_rtc()
{
srs_error_t err = srs_success;
close_listeners(SrsListenerRtc);
if (!_srs_config->get_rtc_enabled()) {
return err;
}
SrsListener* listener = NULL;
listener = new SrsRtcListener(this, rtc_server, SrsListenerRtc);
srs_assert(listener != NULL);
listeners.push_back(listener);
int port = _srs_config->get_rtc_listen();
if (port <= 0) {
return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port);
}
if ((err = listener->listen(srs_any_address_for_listener(), port)) != srs_success) {
return srs_error_wrap(err, "listen at %d", port);
}
return err;
}
void SrsServer::close_listeners(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;
@ -1445,23 +1363,6 @@ void SrsServer::close_listeners(SrsListenerType type)
}
}
SrsListener* SrsServer::find_listener(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it;
if (listener->listen_type() != type) {
++it;
continue;
}
return *it;
}
return NULL;
}
void SrsServer::resample_kbps()
{
SrsStatistic* stat = SrsStatistic::instance();
@ -1510,6 +1411,11 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
return err;
}
SrsHttpServeMux* SrsServer::api_server()
{
return http_api_mux;
}
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
srs_error_t err = srs_success;

View file

@ -41,7 +41,6 @@ class SrsServer;
class SrsConnection;
class SrsHttpServeMux;
class SrsHttpServer;
class SrsRtcServer;
class SrsIngester;
class SrsHttpHeartbeat;
class SrsKbps;
@ -70,8 +69,6 @@ enum SrsListenerType
SrsListenerRtsp = 4,
// TCP stream, FLV stream over HTTP.
SrsListenerFlv = 5,
// UDP remux, rtp over udp
SrsListenerRtc = 6,
};
// A common tcp listener, for RTMP/HTTP server.
@ -159,19 +156,6 @@ public:
virtual ~SrsUdpCasterListener();
};
// A UDP listener, for udp remux rtc server
class SrsRtcListener : public SrsListener
{
protected:
SrsUdpMuxListener* listener;
ISrsUdpMuxHandler* rtc;
public:
SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t);
virtual ~SrsRtcListener();
public:
virtual srs_error_t listen(std::string i, int p);
};
// Convert signal to io,
// @see: st-1.9/docs/notes.html
class SrsSignalManager : public ISrsCoroutineHandler
@ -241,7 +225,6 @@ private:
// TODO: FIXME: rename to http_api
SrsHttpServeMux* http_api_mux;
SrsHttpServer* http_server;
SrsRtcServer* rtc_server;
SrsHttpHeartbeat* http_heartbeat;
SrsIngester* ingester;
SrsCoroutineManager* conn_manager;
@ -320,7 +303,6 @@ private:
virtual srs_error_t listen_http_api();
virtual srs_error_t listen_http_stream();
virtual srs_error_t listen_stream_caster();
virtual srs_error_t listen_rtc();
// Close the listeners for specified type,
// Remove the listen object from manager.
virtual void close_listeners(SrsListenerType type);
@ -333,6 +315,8 @@ public:
// for instance RTMP connection to serve client.
// @param stfd, the client fd in st boxed, the underlayer fd.
virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd);
// TODO: FIXME: Fetch from hybrid server manager.
virtual SrsHttpServeMux* api_server();
private:
virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn);
// Interface IConnectionManager
@ -356,8 +340,6 @@ public:
public:
virtual srs_error_t on_publish(SrsSource* s, SrsRequest* r);
virtual void on_unpublish(SrsSource* s, SrsRequest* r);
// listeners commuction
virtual SrsListener* find_listener(SrsListenerType type);
};
#endif

View file

@ -50,6 +50,7 @@ using namespace std;
#include <srs_core_autofree.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_rtc_conn.hpp>
#ifdef SRS_AUTO_SRT
#include <srt_server.hpp>
@ -441,11 +442,15 @@ srs_error_t run_hybrid_server()
{
srs_error_t err = srs_success;
// Create servers and register them.
_srs_hybrid->register_server(new SrsServerAdapter());
#ifdef SRS_AUTO_SRT
_srs_hybrid->register_server(new SrtServerAdapter());
#endif
_srs_hybrid->register_server(new RtcServerAdapter());
// Do some system initialize.
if ((err = _srs_hybrid->initialize()) != srs_success) {
return srs_error_wrap(err, "hybrid initialize");