1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #133, alloc and free rtp port.

This commit is contained in:
winlin 2015-02-17 21:10:06 +08:00
parent d4ceff649f
commit f14af45413
9 changed files with 183 additions and 38 deletions

View file

@ -77,8 +77,9 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, int p)
SrsUdpListener::~SrsUdpListener() SrsUdpListener::~SrsUdpListener()
{ {
// close the stfd to trigger thread to interrupted.
srs_close_stfd(stfd); srs_close_stfd(stfd);
pthread->stop(); pthread->stop();
srs_freep(pthread); srs_freep(pthread);
@ -143,8 +144,8 @@ int SrsUdpListener::listen()
int SrsUdpListener::cycle() int SrsUdpListener::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
for (;;) { while (pthread->can_loop()) {
// TODO: FIXME: support ipv6, @see man 7 ipv6 // TODO: FIXME: support ipv6, @see man 7 ipv6
sockaddr_in from; sockaddr_in from;
int nb_from = sizeof(sockaddr_in); int nb_from = sizeof(sockaddr_in);
@ -181,8 +182,9 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, int p)
SrsTcpListener::~SrsTcpListener() SrsTcpListener::~SrsTcpListener()
{ {
// close the stfd to trigger thread to interrupted.
srs_close_stfd(stfd); srs_close_stfd(stfd);
pthread->stop(); pthread->stop();
srs_freep(pthread); srs_freep(pthread);

View file

@ -36,21 +36,41 @@ using namespace std;
#ifdef SRS_AUTO_STREAM_CASTER #ifdef SRS_AUTO_STREAM_CASTER
ISrsRtspHandler::ISrsRtspHandler() SrsRtpConn::SrsRtpConn(SrsRtspConn* r, int p)
{ {
rtsp = r;
_port = p;
listener = new SrsUdpListener(this, p);
} }
ISrsRtspHandler::~ISrsRtspHandler() SrsRtpConn::~SrsRtpConn()
{ {
srs_freep(listener);
} }
SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o, int lpmin, int lpmax) int SrsRtpConn::port()
{
return _port;
}
int SrsRtpConn::listen()
{
return listener->listen();
}
int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
{
int ret = ERROR_SUCCESS;
return ret;
}
SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
{ {
output = o; output = o;
local_port_min = lpmin;
local_port_max = lpmax;
session = "O9EaZ4bf"; // TODO: FIXME: generate session id. session = "O9EaZ4bf"; // TODO: FIXME: generate session id.
video_rtp = NULL;
audio_rtp = NULL;
caster = c; caster = c;
stfd = fd; stfd = fd;
@ -64,6 +84,9 @@ SrsRtspConn::~SrsRtspConn()
srs_close_stfd(stfd); srs_close_stfd(stfd);
trd->stop(); trd->stop();
srs_freep(video_rtp);
srs_freep(audio_rtp);
srs_freep(trd); srs_freep(trd);
srs_freep(skt); srs_freep(skt);
srs_freep(rtsp); srs_freep(rtsp);
@ -103,6 +126,8 @@ int SrsRtspConn::do_cycle()
} }
} else if (req->is_announce()) { } else if (req->is_announce()) {
srs_assert(req->sdp); srs_assert(req->sdp);
video_id = req->sdp->video_stream_id;
audio_id = req->sdp->audio_stream_id;
sps = req->sdp->video_sps; sps = req->sdp->video_sps;
pps = req->sdp->video_pps; pps = req->sdp->video_pps;
asc = req->sdp->audio_sh; asc = req->sdp->audio_sh;
@ -119,11 +144,31 @@ int SrsRtspConn::do_cycle()
} }
} else if (req->is_setup()) { } else if (req->is_setup()) {
srs_assert(req->transport); srs_assert(req->transport);
int lpm = 0;
if ((ret = caster->alloc_port(&lpm)) != ERROR_SUCCESS) {
srs_error("rtsp: alloc port failed. ret=%d", ret);
return ret;
}
SrsRtpConn* rtp = NULL;
if (req->stream_id == video_id) {
srs_freep(video_rtp);
rtp = video_rtp = new SrsRtpConn(this, lpm);
} else {
srs_freep(audio_rtp);
rtp = audio_rtp = new SrsRtpConn(this, lpm);
}
if ((ret = rtp->listen()) != ERROR_SUCCESS) {
srs_error("rtsp: rtp listen at port=%d failed. ret=%d", lpm, ret);
return ret;
}
srs_trace("rtsp: rtp listen at port=%d ok.", lpm);
SrsRtspSetupResponse* res = new SrsRtspSetupResponse(req->seq); SrsRtspSetupResponse* res = new SrsRtspSetupResponse(req->seq);
res->client_port_min = req->transport->client_port_min; res->client_port_min = req->transport->client_port_min;
res->client_port_max = req->transport->client_port_max; res->client_port_max = req->transport->client_port_max;
res->local_port_min = local_port_min; res->local_port_min = lpm;
res->local_port_max = local_port_max; res->local_port_max = lpm + 1;
res->session = session; res->session = session;
if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
@ -165,6 +210,14 @@ int SrsRtspConn::cycle()
void SrsRtspConn::on_thread_stop() void SrsRtspConn::on_thread_stop()
{ {
if (video_rtp) {
caster->free_port(video_rtp->port(), video_rtp->port() + 1);
}
if (audio_rtp) {
caster->free_port(audio_rtp->port(), audio_rtp->port() + 1);
}
caster->remove(this); caster->remove(this);
} }
@ -184,16 +237,40 @@ SrsRtspCaster::~SrsRtspCaster()
srs_freep(conn); srs_freep(conn);
} }
clients.clear(); clients.clear();
used_ports.clear();
} }
int SrsRtspCaster::serve_client(st_netfd_t stfd) int SrsRtspCaster::alloc_port(int* pport)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRtspConn* conn = new SrsRtspConn( // use a pair of port.
this, stfd, for (int i = local_port_min; i < local_port_max - 1; i += 2) {
output, local_port_min, local_port_max if (!used_ports[i]) {
); used_ports[i] = true;
used_ports[i + 1] = true;
*pport = i;
break;
}
}
srs_info("rtsp: alloc port=%d-%d", *pport, *pport + 1);
return ret;
}
void SrsRtspCaster::free_port(int lpmin, int lpmax)
{
for (int i = lpmin; i < lpmax; i++) {
used_ports[i] = false;
}
srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax);
}
int SrsRtspCaster::on_tcp_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
SrsRtspConn* conn = new SrsRtspConn(this, stfd, output);
if ((ret = conn->serve()) != ERROR_SUCCESS) { if ((ret = conn->serve()) != ERROR_SUCCESS) {
srs_error("rtsp: serve client failed. ret=%d", ret); srs_error("rtsp: serve client failed. ret=%d", ret);

View file

@ -32,30 +32,38 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <string> #include <string>
#include <vector> #include <vector>
#include <map>
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
#include <srs_app_thread.hpp> #include <srs_app_thread.hpp>
#include <srs_app_listener.hpp>
#ifdef SRS_AUTO_STREAM_CASTER #ifdef SRS_AUTO_STREAM_CASTER
class SrsConfDirective;
class SrsStSocket; class SrsStSocket;
class SrsRtspConn;
class SrsRtspStack; class SrsRtspStack;
class SrsRtspCaster; class SrsRtspCaster;
class SrsConfDirective;
/** /**
* the handler for rtsp handler. * a rtp connection which transport a stream.
*/ */
class ISrsRtspHandler class SrsRtpConn: public ISrsUdpHandler
{ {
private:
SrsUdpListener* listener;
SrsRtspConn* rtsp;
int _port;
public: public:
ISrsRtspHandler(); SrsRtpConn(SrsRtspConn* r, int p);
virtual ~ISrsRtspHandler(); virtual ~SrsRtpConn();
public: public:
/** virtual int port();
* serve the rtsp connection. virtual int listen();
*/ // interface ISrsUdpHandler
virtual int serve_client(st_netfd_t stfd) = 0; public:
virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
}; };
/** /**
@ -65,10 +73,14 @@ class SrsRtspConn : public ISrsThreadHandler
{ {
private: private:
std::string output; std::string output;
int local_port_min;
int local_port_max;
private: private:
std::string session; std::string session;
// video stream.
std::string video_id;
SrsRtpConn* video_rtp;
// audio stream.
std::string audio_id;
SrsRtpConn* audio_rtp;
// video sequence header. // video sequence header.
std::string sps; std::string sps;
std::string pps; std::string pps;
@ -81,7 +93,7 @@ private:
SrsRtspCaster* caster; SrsRtspCaster* caster;
SrsThread* trd; SrsThread* trd;
public: public:
SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o, int lpmin, int lpmax); SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o);
virtual ~SrsRtspConn(); virtual ~SrsRtspConn();
public: public:
virtual int serve(); virtual int serve();
@ -96,19 +108,32 @@ public:
/** /**
* the caster for rtsp. * the caster for rtsp.
*/ */
class SrsRtspCaster : public ISrsRtspHandler class SrsRtspCaster : public ISrsTcpHandler
{ {
private: private:
std::string output; std::string output;
int local_port_min; int local_port_min;
int local_port_max; int local_port_max;
// key: port, value: whether used.
std::map<int, bool> used_ports;
private: private:
std::vector<SrsRtspConn*> clients; std::vector<SrsRtspConn*> clients;
public: public:
SrsRtspCaster(SrsConfDirective* c); SrsRtspCaster(SrsConfDirective* c);
virtual ~SrsRtspCaster(); virtual ~SrsRtspCaster();
public: public:
virtual int serve_client(st_netfd_t stfd); /**
* alloc a rtp port from local ports pool.
* @param pport output the rtp port.
*/
virtual int alloc_port(int* pport);
/**
* free the alloced rtp port.
*/
virtual void free_port(int lpmin, int lpmax);
// interface ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
// internal methods. // internal methods.
public: public:
virtual void remove(SrsRtspConn* conn); virtual void remove(SrsRtspConn* conn);

View file

@ -220,7 +220,7 @@ int SrsRtspListener::on_tcp_client(st_netfd_t stfd)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if ((ret = caster->serve_client(stfd)) != ERROR_SUCCESS) { if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret); srs_warn("accept client error. ret=%d", ret);
return ret; return ret;
} }

View file

@ -47,8 +47,8 @@ class SrsIngester;
class SrsHttpHeartbeat; class SrsHttpHeartbeat;
class SrsKbps; class SrsKbps;
class SrsConfDirective; class SrsConfDirective;
class ISrsTcpHandler;
class ISrsUdpHandler; class ISrsUdpHandler;
class ISrsRtspHandler;
class SrsUdpListener; class SrsUdpListener;
class SrsTcpListener; class SrsTcpListener;
@ -111,8 +111,7 @@ class SrsRtspListener : virtual public SrsListener, virtual public ISrsTcpHandle
{ {
private: private:
SrsTcpListener* listener; SrsTcpListener* listener;
private: ISrsTcpHandler* caster;
ISrsRtspHandler* caster;
public: public:
SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c); SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
virtual ~SrsRtspListener(); virtual ~SrsRtspListener();

View file

@ -62,6 +62,7 @@ SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_
tid = NULL; tid = NULL;
loop = false; loop = false;
really_terminated = true;
_cid = -1; _cid = -1;
_joinable = joinable; _joinable = joinable;
@ -120,10 +121,27 @@ void SrsThread::stop()
// which will terminate the cycle thread. // which will terminate the cycle thread.
st_thread_interrupt(tid); st_thread_interrupt(tid);
// wait the thread to exit. // when joinable, wait util quit.
int ret = st_thread_join(tid, NULL); if (_joinable) {
// TODO: FIXME: the join maybe failed, should use a variable to ensure thread terminated. // wait the thread to exit.
srs_assert(ret == 0); int ret = st_thread_join(tid, NULL);
if (ret) {
srs_warn("core: ignore join thread failed.");
}
// wait the thread actually terminated.
// sometimes the thread join return -1, for example,
// when thread use st_recvfrom, the thread join return -1.
// so here, we use a variable to ensure the thread stopped.
while (!really_terminated) {
st_usleep(10 * 1000);
if (really_terminated) {
break;
}
srs_warn("core: wait thread to actually terminated");
}
}
tid = NULL; tid = NULL;
} }
@ -150,6 +168,9 @@ void SrsThread::thread_cycle()
srs_assert(handler); srs_assert(handler);
handler->on_thread_start(); handler->on_thread_start();
// thread is running now.
really_terminated = false;
// wait for cid to ready, for parent thread to get the cid. // wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) { while (!can_run && loop) {
@ -191,6 +212,9 @@ failed:
handler->on_thread_stop(); handler->on_thread_stop();
srs_info("thread %s cycle finished", _name); srs_info("thread %s cycle finished", _name);
// readly terminated now.
really_terminated = true;
} }
void* SrsThread::thread_fun(void* arg) void* SrsThread::thread_fun(void* arg)

View file

@ -143,6 +143,7 @@ private:
int _cid; int _cid;
bool loop; bool loop;
bool can_run; bool can_run;
bool really_terminated;
bool _joinable; bool _joinable;
const char* _name; const char* _name;
private: private:

View file

@ -746,6 +746,18 @@ int SrsRtspStack::do_recv_message(SrsRtspRequest* req)
} }
} }
// for setup, parse the stream id from uri.
if (req->is_setup()) {
size_t pos = string::npos;
if ((pos = req->uri.rfind("/")) != string::npos) {
req->stream_id = req->uri.substr(pos + 1);
}
if ((pos = req->stream_id.find("=")) != string::npos) {
req->stream_id = req->stream_id.substr(pos + 1);
}
srs_info("rtsp: setup stream id=%s", req->stream_id.c_str());
}
// parse rdp body. // parse rdp body.
long consumed = 0; long consumed = 0;
while (consumed < req->content_length) { while (consumed < req->content_length) {

View file

@ -337,6 +337,7 @@ public:
* assumed. It is interpreted according to [H14.14]. * assumed. It is interpreted according to [H14.14].
*/ */
long content_length; long content_length;
/** /**
* the sdp in announce, NULL for no sdp. * the sdp in announce, NULL for no sdp.
*/ */
@ -345,6 +346,10 @@ public:
* the transport in setup, NULL for no transport. * the transport in setup, NULL for no transport.
*/ */
SrsRtspTransport* transport; SrsRtspTransport* transport;
/**
* for setup message, parse the stream id from uri.
*/
std::string stream_id;
public: public:
SrsRtspRequest(); SrsRtspRequest();
virtual ~SrsRtspRequest(); virtual ~SrsRtspRequest();