1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #133, create rtsp framework.

This commit is contained in:
winlin 2015-02-16 14:05:01 +08:00
parent e81e090239
commit c0e50265bd
11 changed files with 348 additions and 26 deletions

View file

@ -23,7 +23,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_rtsp.hpp>
#include <algorithm>
using namespace std;
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_rtsp_stack.hpp>
#include <srs_app_st_socket.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_utility.hpp>
#ifdef SRS_AUTO_STREAM_CASTER
@ -35,13 +43,115 @@ ISrsRtspHandler::~ISrsRtspHandler()
{
}
SrsRtspConn::SrsRtspConn(SrsConfDirective* c)
SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
{
output = _srs_config->get_stream_caster_output(c);
output = o;
caster = c;
stfd = fd;
skt = new SrsStSocket(fd);
rtsp = new SrsRtspStack(skt);
trd = new SrsThread("rtsp", this, 0, false);
}
SrsRtspConn::~SrsRtspConn()
{
srs_close_stfd(stfd);
trd->stop();
srs_freep(trd);
srs_freep(skt);
srs_freep(rtsp);
}
int SrsRtspConn::serve()
{
return trd->start();
}
int SrsRtspConn::do_cycle()
{
int ret = ERROR_SUCCESS;
// retrieve ip of client.
std::string ip = srs_get_peer_ip(st_netfd_fileno(stfd));
srs_trace("rtsp: serve %s", ip.c_str());
return ret;
}
int SrsRtspConn::cycle()
{
// serve the rtsp client.
int ret = do_cycle();
// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client finished.");
}
// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. ret=%d", ret);
}
// terminate thread in the thread cycle itself.
trd->stop_loop();
return ERROR_SUCCESS;
}
void SrsRtspConn::on_thread_stop()
{
caster->remove(this);
}
SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c)
{
// TODO: FIXME: support reload.
output = _srs_config->get_stream_caster_output(c);
}
SrsRtspCaster::~SrsRtspCaster()
{
std::vector<SrsRtspConn*>::iterator it;
for (it = clients.begin(); it != clients.end(); ++it) {
SrsRtspConn* conn = *it;
srs_freep(conn);
}
clients.clear();
}
int SrsRtspCaster::serve_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
SrsRtspConn* conn = new SrsRtspConn(this, stfd, output);
if ((ret = conn->serve()) != ERROR_SUCCESS) {
srs_error("rtsp: serve client failed. ret=%d", ret);
srs_freep(conn);
return ret;
}
clients.push_back(conn);
srs_info("rtsp: start thread to serve client.");
return ret;
}
void SrsRtspCaster::remove(SrsRtspConn* conn)
{
std::vector<SrsRtspConn*>::iterator it = find(clients.begin(), clients.end(), conn);
if (it != clients.end()) {
clients.erase(it);
}
srs_info("rtsp: remove connection from caster.");
srs_freep(conn);
}
#endif

View file

@ -31,10 +31,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <srs_app_st.hpp>
#include <srs_app_thread.hpp>
#ifdef SRS_AUTO_STREAM_CASTER
class SrsConfDirective;
class SrsStSocket;
class SrsRtspStack;
class SrsRtspCaster;
/**
* the handler for rtsp handler.
@ -44,18 +51,54 @@ class ISrsRtspHandler
public:
ISrsRtspHandler();
virtual ~ISrsRtspHandler();
public:
/**
* serve the rtsp connection.
*/
virtual int serve_client(st_netfd_t stfd) = 0;
};
/**
* the connection for rtsp.
* the rtsp connection serve the fd.
*/
class SrsRtspConn : public ISrsRtspHandler
class SrsRtspConn : public ISrsThreadHandler
{
private:
std::string output;
st_netfd_t stfd;
SrsStSocket* skt;
SrsRtspStack* rtsp;
SrsRtspCaster* caster;
SrsThread* trd;
public:
SrsRtspConn(SrsConfDirective* c);
SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o);
virtual ~SrsRtspConn();
public:
virtual int serve();
private:
virtual int do_cycle();
// interface ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_stop();
};
/**
* the caster for rtsp.
*/
class SrsRtspCaster : public ISrsRtspHandler
{
private:
std::string output;
std::vector<SrsRtspConn*> clients;
public:
SrsRtspCaster(SrsConfDirective* c);
virtual ~SrsRtspCaster();
public:
virtual int serve_client(st_netfd_t stfd);
// internal methods.
public:
virtual void remove(SrsRtspConn* conn);
};
#endif

View file

@ -240,7 +240,7 @@ SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsCon
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
if (_type == SrsListenerRtsp) {
caster = new SrsRtspConn(c);
caster = new SrsRtspCaster(c);
}
}
@ -262,7 +262,7 @@ int SrsRtspListener::cycle()
}
srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) {
if ((ret = caster->serve_client(client_stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}

View file

@ -46,22 +46,78 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* which will cause the socket to return error and
* terminate the cycle thread.
*
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
* Usage 1: stop by other thread.
* user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread:
* 1. create SrsThread field, with joinable true.
* 2. must use stop to stop and join the thread.
* for example:
* class SrsIngester : public ISrsThreadHandler {
* public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
* // check status, start ffmpeg when stopped.
* }
* };
*
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
* Usage 2: stop by thread itself.
* user can create thread which stop itself,
* generally only need to provides a start method,
* the object will destroy itself then terminate the thread, @see SrsConnection
* 1. create SrsThread field, with joinable false.
* 2. owner stop thread loop, destroy itself when thread stop.
* for example:
* class SrsConnection : public ISrsThreadHandler {
* public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); }
* public: virtual int start() { return pthread->start(); }
* public: virtual int cycle() {
* // serve client.
* // set loop to stop to quit, stop thread itself.
* pthread->stop_loop();
* }
* public: virtual int on_thread_stop() {
* // remove the connection in thread itself.
* server->remove(this);
* }
* };
*
* Usage 3: loop in the cycle method.
* user can use loop code in the cycle method, @see SrsForwarder
* 1. create SrsThread field, with or without joinable is ok.
* 2. loop code in cycle method, check the can_loop() for thread to quit.
* for example:
* class SrsForwarder : public ISrsThreadHandler {
* public: virtual int cycle() {
* while (pthread->can_loop()) {
* // read msgs from queue and forward to server.
* }
* }
* };
*
* @remark why should check can_loop() in cycle method?
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
*
* @remark why should use stop_loop() to terminate thread in itself?
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*
* @remark when should set the interval_us, and when not?
* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
* so the interval_us used to sleep for each cycle.
*/
class ISrsThreadHandler
{