diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 5f536857d..2a82fa506 100644 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -77,8 +77,9 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, int p) SrsUdpListener::~SrsUdpListener() { + // close the stfd to trigger thread to interrupted. srs_close_stfd(stfd); - + pthread->stop(); srs_freep(pthread); @@ -143,8 +144,8 @@ int SrsUdpListener::listen() int SrsUdpListener::cycle() { int ret = ERROR_SUCCESS; - - for (;;) { + + while (pthread->can_loop()) { // TODO: FIXME: support ipv6, @see man 7 ipv6 sockaddr_in from; int nb_from = sizeof(sockaddr_in); @@ -181,8 +182,9 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, int p) SrsTcpListener::~SrsTcpListener() { + // close the stfd to trigger thread to interrupted. srs_close_stfd(stfd); - + pthread->stop(); srs_freep(pthread); diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 28ee0dee4..44448aa91 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -36,21 +36,41 @@ using namespace std; #ifdef SRS_AUTO_STREAM_CASTER -ISrsRtspHandler::ISrsRtspHandler() +SrsRtpConn::SrsRtpConn(SrsRtspConn* r, int p) { + rtsp = r; + _port = p; + listener = new SrsUdpListener(this, p); } -ISrsRtspHandler::~ISrsRtspHandler() +SrsRtpConn::~SrsRtpConn() { + srs_freep(listener); } -SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o, int lpmin, int lpmax) +int SrsRtpConn::port() +{ + return _port; +} + +int SrsRtpConn::listen() +{ + return listener->listen(); +} + +int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) { output = o; - local_port_min = lpmin; - local_port_max = lpmax; session = "O9EaZ4bf"; // TODO: FIXME: generate session id. + video_rtp = NULL; + audio_rtp = NULL; caster = c; stfd = fd; @@ -64,6 +84,9 @@ SrsRtspConn::~SrsRtspConn() srs_close_stfd(stfd); trd->stop(); + srs_freep(video_rtp); + srs_freep(audio_rtp); + srs_freep(trd); srs_freep(skt); srs_freep(rtsp); @@ -103,6 +126,8 @@ int SrsRtspConn::do_cycle() } } else if (req->is_announce()) { srs_assert(req->sdp); + video_id = req->sdp->video_stream_id; + audio_id = req->sdp->audio_stream_id; sps = req->sdp->video_sps; pps = req->sdp->video_pps; asc = req->sdp->audio_sh; @@ -119,11 +144,31 @@ int SrsRtspConn::do_cycle() } } else if (req->is_setup()) { srs_assert(req->transport); + int lpm = 0; + if ((ret = caster->alloc_port(&lpm)) != ERROR_SUCCESS) { + srs_error("rtsp: alloc port failed. ret=%d", ret); + return ret; + } + + SrsRtpConn* rtp = NULL; + if (req->stream_id == video_id) { + srs_freep(video_rtp); + rtp = video_rtp = new SrsRtpConn(this, lpm); + } else { + srs_freep(audio_rtp); + rtp = audio_rtp = new SrsRtpConn(this, lpm); + } + if ((ret = rtp->listen()) != ERROR_SUCCESS) { + srs_error("rtsp: rtp listen at port=%d failed. ret=%d", lpm, ret); + return ret; + } + srs_trace("rtsp: rtp listen at port=%d ok.", lpm); + SrsRtspSetupResponse* res = new SrsRtspSetupResponse(req->seq); res->client_port_min = req->transport->client_port_min; res->client_port_max = req->transport->client_port_max; - res->local_port_min = local_port_min; - res->local_port_max = local_port_max; + res->local_port_min = lpm; + res->local_port_max = lpm + 1; res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { @@ -165,6 +210,14 @@ int SrsRtspConn::cycle() void SrsRtspConn::on_thread_stop() { + if (video_rtp) { + caster->free_port(video_rtp->port(), video_rtp->port() + 1); + } + + if (audio_rtp) { + caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); + } + caster->remove(this); } @@ -184,16 +237,40 @@ SrsRtspCaster::~SrsRtspCaster() srs_freep(conn); } clients.clear(); + used_ports.clear(); } -int SrsRtspCaster::serve_client(st_netfd_t stfd) +int SrsRtspCaster::alloc_port(int* pport) { int ret = ERROR_SUCCESS; - SrsRtspConn* conn = new SrsRtspConn( - this, stfd, - output, local_port_min, local_port_max - ); + // use a pair of port. + for (int i = local_port_min; i < local_port_max - 1; i += 2) { + if (!used_ports[i]) { + used_ports[i] = true; + used_ports[i + 1] = true; + *pport = i; + break; + } + } + srs_info("rtsp: alloc port=%d-%d", *pport, *pport + 1); + + return ret; +} + +void SrsRtspCaster::free_port(int lpmin, int lpmax) +{ + for (int i = lpmin; i < lpmax; i++) { + used_ports[i] = false; + } + srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax); +} + +int SrsRtspCaster::on_tcp_client(st_netfd_t stfd) +{ + int ret = ERROR_SUCCESS; + + SrsRtspConn* conn = new SrsRtspConn(this, stfd, output); if ((ret = conn->serve()) != ERROR_SUCCESS) { srs_error("rtsp: serve client failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 6cbbbfb29..2fe6a9ba8 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -32,30 +32,38 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include #include +#include #ifdef SRS_AUTO_STREAM_CASTER -class SrsConfDirective; class SrsStSocket; +class SrsRtspConn; class SrsRtspStack; class SrsRtspCaster; +class SrsConfDirective; /** -* the handler for rtsp handler. +* a rtp connection which transport a stream. */ -class ISrsRtspHandler +class SrsRtpConn: public ISrsUdpHandler { +private: + SrsUdpListener* listener; + SrsRtspConn* rtsp; + int _port; public: - ISrsRtspHandler(); - virtual ~ISrsRtspHandler(); + SrsRtpConn(SrsRtspConn* r, int p); + virtual ~SrsRtpConn(); public: - /** - * serve the rtsp connection. - */ - virtual int serve_client(st_netfd_t stfd) = 0; + virtual int port(); + virtual int listen(); +// interface ISrsUdpHandler +public: + virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); }; /** @@ -65,10 +73,14 @@ class SrsRtspConn : public ISrsThreadHandler { private: std::string output; - int local_port_min; - int local_port_max; private: std::string session; + // video stream. + std::string video_id; + SrsRtpConn* video_rtp; + // audio stream. + std::string audio_id; + SrsRtpConn* audio_rtp; // video sequence header. std::string sps; std::string pps; @@ -81,7 +93,7 @@ private: SrsRtspCaster* caster; SrsThread* trd; public: - SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o, int lpmin, int lpmax); + SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o); virtual ~SrsRtspConn(); public: virtual int serve(); @@ -96,19 +108,32 @@ public: /** * the caster for rtsp. */ -class SrsRtspCaster : public ISrsRtspHandler +class SrsRtspCaster : public ISrsTcpHandler { private: std::string output; int local_port_min; int local_port_max; + // key: port, value: whether used. + std::map used_ports; private: std::vector clients; public: SrsRtspCaster(SrsConfDirective* c); virtual ~SrsRtspCaster(); public: - virtual int serve_client(st_netfd_t stfd); + /** + * alloc a rtp port from local ports pool. + * @param pport output the rtp port. + */ + virtual int alloc_port(int* pport); + /** + * free the alloced rtp port. + */ + virtual void free_port(int lpmin, int lpmax); +// interface ISrsTcpHandler +public: + virtual int on_tcp_client(st_netfd_t stfd); // internal methods. public: virtual void remove(SrsRtspConn* conn); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index dbc2cefdb..8be3609f1 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -220,7 +220,7 @@ int SrsRtspListener::on_tcp_client(st_netfd_t stfd) { int ret = ERROR_SUCCESS; - if ((ret = caster->serve_client(stfd)) != ERROR_SUCCESS) { + if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) { srs_warn("accept client error. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index c58f4de68..62af1e9fa 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -47,8 +47,8 @@ class SrsIngester; class SrsHttpHeartbeat; class SrsKbps; class SrsConfDirective; +class ISrsTcpHandler; class ISrsUdpHandler; -class ISrsRtspHandler; class SrsUdpListener; class SrsTcpListener; @@ -111,8 +111,7 @@ class SrsRtspListener : virtual public SrsListener, virtual public ISrsTcpHandle { private: SrsTcpListener* listener; -private: - ISrsRtspHandler* caster; + ISrsTcpHandler* caster; public: SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c); virtual ~SrsRtspListener(); diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index ee6a6d03c..0b38c4d44 100644 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -62,6 +62,7 @@ SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_ tid = NULL; loop = false; + really_terminated = true; _cid = -1; _joinable = joinable; @@ -120,10 +121,27 @@ void SrsThread::stop() // which will terminate the cycle thread. st_thread_interrupt(tid); - // wait the thread to exit. - int ret = st_thread_join(tid, NULL); - // TODO: FIXME: the join maybe failed, should use a variable to ensure thread terminated. - srs_assert(ret == 0); + // when joinable, wait util quit. + if (_joinable) { + // wait the thread to exit. + int ret = st_thread_join(tid, NULL); + if (ret) { + srs_warn("core: ignore join thread failed."); + } + + // wait the thread actually terminated. + // sometimes the thread join return -1, for example, + // when thread use st_recvfrom, the thread join return -1. + // so here, we use a variable to ensure the thread stopped. + while (!really_terminated) { + st_usleep(10 * 1000); + + if (really_terminated) { + break; + } + srs_warn("core: wait thread to actually terminated"); + } + } tid = NULL; } @@ -150,6 +168,9 @@ void SrsThread::thread_cycle() srs_assert(handler); handler->on_thread_start(); + + // thread is running now. + really_terminated = false; // wait for cid to ready, for parent thread to get the cid. while (!can_run && loop) { @@ -191,6 +212,9 @@ failed: handler->on_thread_stop(); srs_info("thread %s cycle finished", _name); + + // readly terminated now. + really_terminated = true; } void* SrsThread::thread_fun(void* arg) diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index 1af72a99e..38c44e2ea 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -143,6 +143,7 @@ private: int _cid; bool loop; bool can_run; + bool really_terminated; bool _joinable; const char* _name; private: diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp index b097aa651..f6709a280 100644 --- a/trunk/src/protocol/srs_rtsp_stack.cpp +++ b/trunk/src/protocol/srs_rtsp_stack.cpp @@ -746,6 +746,18 @@ int SrsRtspStack::do_recv_message(SrsRtspRequest* req) } } + // for setup, parse the stream id from uri. + if (req->is_setup()) { + size_t pos = string::npos; + if ((pos = req->uri.rfind("/")) != string::npos) { + req->stream_id = req->uri.substr(pos + 1); + } + if ((pos = req->stream_id.find("=")) != string::npos) { + req->stream_id = req->stream_id.substr(pos + 1); + } + srs_info("rtsp: setup stream id=%s", req->stream_id.c_str()); + } + // parse rdp body. long consumed = 0; while (consumed < req->content_length) { diff --git a/trunk/src/protocol/srs_rtsp_stack.hpp b/trunk/src/protocol/srs_rtsp_stack.hpp index f80bde5ff..f0eedf3cc 100644 --- a/trunk/src/protocol/srs_rtsp_stack.hpp +++ b/trunk/src/protocol/srs_rtsp_stack.hpp @@ -337,6 +337,7 @@ public: * assumed. It is interpreted according to [H14.14]. */ long content_length; + /** * the sdp in announce, NULL for no sdp. */ @@ -345,6 +346,10 @@ public: * the transport in setup, NULL for no transport. */ SrsRtspTransport* transport; + /** + * for setup message, parse the stream id from uri. + */ + std::string stream_id; public: SrsRtspRequest(); virtual ~SrsRtspRequest();