1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 03:41:55 +00:00

merge from 2.0release.

This commit is contained in:
winlin 2015-05-22 17:20:30 +08:00
commit 7243cdbff6
31 changed files with 570 additions and 404 deletions

View file

@ -566,6 +566,9 @@ Supported operating systems and hardware:
### SRS 2.0 history
* v2.0, 2015-05-22, fix [#397](https://github.com/simple-rtmp-server/srs/issues/397) the USER_HZ maybe not 100. 2.0.165
* v2.0, 2015-05-22, for [#400](https://github.com/simple-rtmp-server/srs/issues/400), parse when got entire http header, by feilong. 2.0.164.
* v2.0, 2015-05-19, merge from bravo system, add the rtmfp to bms(commercial srs). 2.0.163.
* v2.0, 2015-05-10, support push flv stream over HTTP POST to SRS.
* v2.0, 2015-04-20, support ingest hls live stream to RTMP.
* v2.0, 2015-04-15, for [#383](https://github.com/simple-rtmp-server/srs/issues/383), support mix_correct algorithm. 2.0.161.
@ -674,6 +677,10 @@ Supported operating systems and hardware:
### SRS 1.0 history
* v1.0, 2015-05-22, fix [#397](https://github.com/simple-rtmp-server/srs/issues/397) the USER_HZ maybe not 100. 1.0.32
* v1.0, 2015-03-26, fix hls aac adts bug, in aac mux. 1.0.31.
* <strong>v1.0, 2015-03-19, [1.0r3 release(1.0.30)](https://github.com/simple-rtmp-server/srs/releases/tag/1.0r3) released. 59511 lines.</strong>
* v1.0, 2015-03-17, remove the osx for 1.0.30.
* v1.0, 2015-02-17, the join maybe failed, should use a variable to ensure thread terminated. 1.0.28.
* <strong>v1.0, 2015-02-12, [1.0r2 release(1.0.27)](https://github.com/simple-rtmp-server/srs/releases/tag/1.0r2) released. 59507 lines.</strong>
* v1.0, 2015-02-11, dev code HuKaiqun for 1.0.27.

View file

@ -408,17 +408,17 @@
isa = PBXGroup;
children = (
3C1232B81AAE824500CE8F6C /* configure */,
3C36DB541ABD1CA70066CCAF /* libs */,
3C1EE6AF1AB107EE00576EE9 /* conf */,
3C1232EF1AAEAC5800CE8F6C /* etc */,
3C1232BA1AAE826F00CE8F6C /* auto */,
3C1232B91AAE825100CE8F6C /* scripts */,
3C12324B1AAE81CE00CE8F6C /* app */,
3C12322C1AAE819900CE8F6C /* protocol */,
3C1231EF1AAE651100CE8F6C /* core */,
3C1232071AAE814200CE8F6C /* kernel */,
3C12322C1AAE819900CE8F6C /* protocol */,
3C12324B1AAE81CE00CE8F6C /* app */,
3C1232041AAE80CB00CE8F6C /* main */,
3C1231F91AAE670E00CE8F6C /* objs */,
3C1231EF1AAE651100CE8F6C /* core */,
3C1232BA1AAE826F00CE8F6C /* auto */,
3C1232B91AAE825100CE8F6C /* scripts */,
3C1EE6AF1AB107EE00576EE9 /* conf */,
3C36DB541ABD1CA70066CCAF /* libs */,
3C1232EF1AAEAC5800CE8F6C /* etc */,
);
path = srs_xcode;
sourceTree = "<group>";
@ -514,12 +514,12 @@
3C12324B1AAE81CE00CE8F6C /* app */ = {
isa = PBXGroup;
children = (
3C28EDDD1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp */,
3C28EDDE1AF5C43F00A3AEAC /* srs_app_caster_flv.hpp */,
3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */,
3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */,
3C12324C1AAE81D900CE8F6C /* srs_app_bandwidth.cpp */,
3C12324D1AAE81D900CE8F6C /* srs_app_bandwidth.hpp */,
3C28EDDD1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp */,
3C28EDDE1AF5C43F00A3AEAC /* srs_app_caster_flv.hpp */,
3C12324E1AAE81D900CE8F6C /* srs_app_config.cpp */,
3C12324F1AAE81D900CE8F6C /* srs_app_config.hpp */,
3C1232501AAE81D900CE8F6C /* srs_app_conn.cpp */,
@ -536,10 +536,10 @@
3C12325B1AAE81D900CE8F6C /* srs_app_ffmpeg.hpp */,
3C12325C1AAE81D900CE8F6C /* srs_app_forward.cpp */,
3C12325D1AAE81D900CE8F6C /* srs_app_forward.hpp */,
3C12325E1AAE81D900CE8F6C /* srs_app_heartbeat.cpp */,
3C12325F1AAE81D900CE8F6C /* srs_app_heartbeat.hpp */,
3C1EE6AC1AB1055800576EE9 /* srs_app_hds.cpp */,
3C1EE6AD1AB1055800576EE9 /* srs_app_hds.hpp */,
3C12325E1AAE81D900CE8F6C /* srs_app_heartbeat.cpp */,
3C12325F1AAE81D900CE8F6C /* srs_app_heartbeat.hpp */,
3C1232601AAE81D900CE8F6C /* srs_app_hls.cpp */,
3C1232611AAE81D900CE8F6C /* srs_app_hls.hpp */,
3C1232621AAE81D900CE8F6C /* srs_app_http_api.cpp */,

View file

@ -441,11 +441,23 @@ int SrsConfig::reload_conf(SrsConfig* conf)
// daemon
//
// always support reload without additional code:
// chunk_size, ff_log_dir, max_connections,
// chunk_size, ff_log_dir,
// bandcheck, http_hooks, heartbeat,
// token_traverse, debug_srs_upnode,
// security
// merge config: max_connections
if (!srs_directive_equals(root->get("max_connections"), old_root->get("max_connections"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_max_conns()) != ERROR_SUCCESS) {
srs_error("notify subscribes reload max_connections failed. ret=%d", ret);
return ret;
}
}
srs_trace("reload max_connections success.");
}
// merge config: listen
if (!srs_directive_equals(root->get("listen"), old_root->get("listen"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {

View file

@ -1082,6 +1082,7 @@ SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c)
{
conn = c;
chunked = false;
keep_alive = true;
_uri = new SrsHttpUri();
_body = new SrsHttpResponseReader(this, io);
_http_ts_send_buffer = new char[SRS_HTTP_TS_SEND_BUFFER_SIZE];
@ -1106,6 +1107,9 @@ int SrsHttpMessage::update(string url, http_parser* header, SrsFastBuffer* body,
std::string transfer_encoding = get_request_header("Transfer-Encoding");
chunked = (transfer_encoding == "chunked");
// whether keep alive.
keep_alive = http_should_keep_alive(header);
// set the buffer.
if ((ret = _body->initialize(body)) != ERROR_SUCCESS) {
return ret;
@ -1232,6 +1236,11 @@ bool SrsHttpMessage::is_chunked()
return chunked;
}
bool SrsHttpMessage::is_keep_alive()
{
return keep_alive;
}
string SrsHttpMessage::uri()
{
std::string uri = _uri->get_schema();
@ -1447,10 +1456,17 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
while (true) {
ssize_t nparsed = 0;
// when buffer not empty, parse it.
if (buffer->size() > 0) {
nparsed = http_parser_execute(&parser, &settings, buffer->bytes(), buffer->size());
srs_info("buffer=%d, nparsed=%d, header=%d", buffer->size(), (int)nparsed, header_parsed);
// when got entire http header, parse it.
// @see https://github.com/simple-rtmp-server/srs/issues/400
char* start = buffer->bytes();
char* end = start + buffer->size();
for (char* p = start; p <= end - 4; p++) {
// SRS_HTTP_CRLFCRLF "\r\n\r\n" // 0x0D0A0D0A
if (p[0] == SRS_CONSTS_CR && p[1] == SRS_CONSTS_LF && p[2] == SRS_CONSTS_CR && p[3] == SRS_CONSTS_LF) {
nparsed = http_parser_execute(&parser, &settings, buffer->bytes(), buffer->size());
srs_info("buffer=%d, nparsed=%d, header=%d", buffer->size(), (int)nparsed, header_parsed);
break;
}
}
// consume the parsed bytes.

View file

@ -494,6 +494,11 @@ private:
* whether the body is chunked.
*/
bool chunked;
/**
* whether the request indicates should keep alive
* for the http connection.
*/
bool keep_alive;
/**
* uri parser
*/
@ -538,6 +543,10 @@ public:
* whether body is chunked encoding, for reader only.
*/
virtual bool is_chunked();
/**
* whether should keep the connection alive.
*/
virtual bool is_keep_alive();
/**
* the uri contains the host and path.
*/

View file

@ -523,6 +523,10 @@ int SrsHttpApi::do_cycle()
// underlayer socket
SrsStSocket skt(stfd);
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/simple-rtmp-server/srs/issues/398
skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
// process http messages.
for (;;) {
SrsHttpMessage* req = NULL;
@ -551,6 +555,12 @@ int SrsHttpApi::do_cycle()
if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {
return ret;
}
// donot keep alive, disconnect it.
// @see https://github.com/simple-rtmp-server/srs/issues/399
if (!req->is_keep_alive()) {
break;
}
}
return ret;

View file

@ -1383,6 +1383,10 @@ int SrsHttpConn::do_cycle()
// underlayer socket
SrsStSocket skt(stfd);
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/simple-rtmp-server/srs/issues/398
skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
// process http messages.
for (;;) {
SrsHttpMessage* req = NULL;
@ -1408,6 +1412,12 @@ int SrsHttpConn::do_cycle()
if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {
return ret;
}
// donot keep alive, disconnect it.
// @see https://github.com/simple-rtmp-server/srs/issues/399
if (!req->is_keep_alive()) {
break;
}
}
return ret;

View file

@ -77,22 +77,22 @@ void SrsKbpsSlice::sample()
}
if (now - sample_30s.time > 30 * 1000) {
sample_30s.kbps = (total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time);
sample_30s.kbps = (int)((total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time));
sample_30s.time = now;
sample_30s.bytes = total_bytes;
}
if (now - sample_1m.time > 60 * 1000) {
sample_1m.kbps = (total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time);
sample_1m.kbps = (int)((total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time));
sample_1m.time = now;
sample_1m.bytes = total_bytes;
}
if (now - sample_5m.time > 300 * 1000) {
sample_5m.kbps = (total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time);
sample_5m.kbps = (int)((total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time));
sample_5m.time = now;
sample_5m.bytes = total_bytes;
}
if (now - sample_60m.time > 3600 * 1000) {
sample_60m.kbps = (total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time);
sample_60m.kbps = (int)((total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time));
sample_60m.time = now;
sample_60m.bytes = total_bytes;
}
@ -160,7 +160,7 @@ int SrsKbps::get_send_kbps()
return 0;
}
int64_t bytes = get_send_bytes();
return bytes * 8 / duration;
return (int)(bytes * 8 / duration);
}
int SrsKbps::get_recv_kbps()
@ -170,7 +170,7 @@ int SrsKbps::get_recv_kbps()
return 0;
}
int64_t bytes = get_recv_bytes();
return bytes * 8 / duration;
return (int)(bytes * 8 / duration);
}
int SrsKbps::get_send_kbps_30s()

View file

@ -54,6 +54,11 @@ ISrsUdpHandler::~ISrsUdpHandler()
{
}
int ISrsUdpHandler::on_stfd_change(st_netfd_t /*fd*/)
{
return ERROR_SUCCESS;
}
ISrsTcpHandler::ISrsTcpHandler()
{
}
@ -69,7 +74,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
port = p;
_fd = -1;
stfd = NULL;
_stfd = NULL;
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
@ -80,7 +85,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
SrsUdpListener::~SrsUdpListener()
{
// close the stfd to trigger thread to interrupted.
srs_close_stfd(stfd);
srs_close_stfd(_stfd);
pthread->stop();
srs_freep(pthread);
@ -97,6 +102,11 @@ int SrsUdpListener::fd()
return _fd;
}
st_netfd_t SrsUdpListener::stfd()
{
return _stfd;
}
int SrsUdpListener::listen()
{
int ret = ERROR_SUCCESS;
@ -127,7 +137,7 @@ int SrsUdpListener::listen()
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((stfd = st_netfd_open_socket(_fd)) == NULL){
if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@ -153,7 +163,7 @@ int SrsUdpListener::cycle()
int nb_from = sizeof(sockaddr_in);
int nread = 0;
if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
srs_warn("ignore recv udp packet failed, nread=%d", nread);
continue;
}
@ -178,7 +188,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
port = p;
_fd = -1;
stfd = NULL;
_stfd = NULL;
pthread = new SrsThread("tcp", this, 0, true);
}
@ -186,7 +196,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
SrsTcpListener::~SrsTcpListener()
{
// close the stfd to trigger thread to interrupted.
srs_close_stfd(stfd);
srs_close_stfd(_stfd);
pthread->stop();
srs_freep(pthread);
@ -238,7 +248,7 @@ int SrsTcpListener::listen()
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((stfd = st_netfd_open_socket(_fd)) == NULL){
if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@ -258,7 +268,7 @@ int SrsTcpListener::cycle()
{
int ret = ERROR_SUCCESS;
st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
if(client_stfd == NULL){
// ignore error.

View file

@ -45,6 +45,12 @@ class ISrsUdpHandler
public:
ISrsUdpHandler();
virtual ~ISrsUdpHandler();
public:
/**
* when fd changed, for instance, reload the listen port,
* notify the handler and user can do something.
*/
virtual int on_stfd_change(st_netfd_t fd);
public:
/**
* when udp listener got a udp packet, notice server to process it.
@ -80,7 +86,7 @@ class SrsUdpListener : public ISrsThreadHandler
{
private:
int _fd;
st_netfd_t stfd;
st_netfd_t _stfd;
SrsThread* pthread;
private:
char* buf;
@ -94,6 +100,7 @@ public:
virtual ~SrsUdpListener();
public:
virtual int fd();
virtual st_netfd_t stfd();
public:
virtual int listen();
// interface ISrsThreadHandler.
@ -108,7 +115,7 @@ class SrsTcpListener : public ISrsThreadHandler
{
private:
int _fd;
st_netfd_t stfd;
st_netfd_t _stfd;
SrsThread* pthread;
private:
ISrsTcpHandler* handler;

View file

@ -40,6 +40,11 @@ int ISrsReloadHandler::on_reload_listen()
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_max_conns()
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_pid()
{
return ERROR_SUCCESS;

View file

@ -44,6 +44,7 @@ public:
ISrsReloadHandler();
virtual ~ISrsReloadHandler();
public:
virtual int on_reload_max_conns();
virtual int on_reload_listen();
virtual int on_reload_pid();
virtual int on_reload_log_tank();
@ -55,6 +56,7 @@ public:
virtual int on_reload_http_stream_enabled();
virtual int on_reload_http_stream_disabled();
virtual int on_reload_http_stream_updated();
public:
virtual int on_reload_vhost_http_updated();
virtual int on_reload_vhost_http_remux_updated();
virtual int on_reload_vhost_added(std::string vhost);

View file

@ -1068,24 +1068,24 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag
return ret;
}
// pause or other msg.
// pause
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
if (!pause) {
srs_info("ignore all amf0/amf3 command except pause.");
if (pause) {
if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
srs_error("rtmp process play client pause failed. ret=%d", ret);
return ret;
}
if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) {
srs_error("consumer process play client pause failed. ret=%d", ret);
return ret;
}
srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms);
return ret;
}
if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
srs_error("rtmp process play client pause failed. ret=%d", ret);
return ret;
}
if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) {
srs_error("consumer process play client pause failed. ret=%d", ret);
return ret;
}
srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms);
// other msg.
srs_info("ignore all amf0/amf3 command except pause and video control.");
return ret;
}

View file

@ -113,23 +113,23 @@ std::string srs_listener_type2string(SrsListenerType type)
}
}
SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
SrsListener::SrsListener(SrsServer* svr, SrsListenerType t)
{
_port = 0;
_server = server;
_type = type;
port = 0;
server = svr;
type = t;
}
SrsListener::~SrsListener()
{
}
SrsListenerType SrsListener::type()
SrsListenerType SrsListener::listen_type()
{
return _type;
return type;
}
SrsStreamListener::SrsStreamListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type)
SrsStreamListener::SrsStreamListener(SrsServer* svr, SrsListenerType t) : SrsListener(svr, t)
{
listener = NULL;
}
@ -139,12 +139,12 @@ SrsStreamListener::~SrsStreamListener()
srs_freep(listener);
}
int SrsStreamListener::listen(string ip, int port)
int SrsStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
_ip = ip;
_port = port;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
@ -158,7 +158,7 @@ int SrsStreamListener::listen(string ip, int port)
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
@ -167,7 +167,7 @@ int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = _server->accept_client(_type, stfd)) != ERROR_SUCCESS) {
if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
@ -176,14 +176,14 @@ int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
SrsRtspListener::SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
{
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
if (_type == SrsListenerRtsp) {
srs_assert(type == SrsListenerRtsp);
if (type == SrsListenerRtsp) {
caster = new SrsRtspCaster(c);
}
}
@ -194,16 +194,16 @@ SrsRtspListener::~SrsRtspListener()
srs_freep(listener);
}
int SrsRtspListener::listen(string ip, int port)
int SrsRtspListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
srs_assert(type == SrsListenerRtsp);
_ip = ip;
_port = port;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
@ -215,9 +215,9 @@ int SrsRtspListener::listen(string ip, int port)
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
@ -234,14 +234,14 @@ int SrsRtspListener::on_tcp_client(st_netfd_t stfd)
return ret;
}
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
{
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerFlv);
if (_type == SrsListenerFlv) {
srs_assert(type == SrsListenerFlv);
if (type == SrsListenerFlv) {
caster = new SrsAppCasterFlv(c);
}
}
@ -252,16 +252,16 @@ SrsHttpFlvListener::~SrsHttpFlvListener()
srs_freep(listener);
}
int SrsHttpFlvListener::listen(string ip, int port)
int SrsHttpFlvListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerFlv);
srs_assert(type == SrsListenerFlv);
_ip = ip;
_port = port;
ip = i;
port = p;
if ((ret = caster->initialize()) != ERROR_SUCCESS) {
return ret;
@ -277,9 +277,9 @@ int SrsHttpFlvListener::listen(string ip, int port)
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
@ -295,36 +295,29 @@ int SrsHttpFlvListener::on_tcp_client(st_netfd_t stfd)
return ret;
}
#endif
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t)
{
_type = type;
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
if (_type == SrsListenerMpegTsOverUdp) {
caster = new SrsMpegtsOverUdp(c);
}
caster = c;
}
SrsUdpCasterListener::~SrsUdpCasterListener()
SrsUdpStreamListener::~SrsUdpStreamListener()
{
srs_freep(caster);
srs_freep(listener);
}
int SrsUdpCasterListener::listen(string ip, int port)
int SrsUdpStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
srs_assert(type == SrsListenerMpegTsOverUdp);
_ip = ip;
_port = port;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsUdpListener(caster, ip, port);
@ -336,12 +329,34 @@ int SrsUdpCasterListener::listen(string ip, int port)
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
// notify the handler the fd changed.
if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) {
srs_error("notify handler fd changed. ret=%d", ret);
return ret;
}
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL)
{
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerMpegTsOverUdp);
if (type == SrsListenerMpegTsOverUdp) {
caster = new SrsMpegtsOverUdp(c);
}
}
SrsUdpCasterListener::~SrsUdpCasterListener()
{
srs_freep(caster);
}
#endif
SrsSignalManager* SrsSignalManager::instance = NULL;
@ -588,6 +603,34 @@ int SrsServer::initialize(ISrsServerCycle* cycle_handler)
return ret;
}
int SrsServer::initialize_st()
{
int ret = ERROR_SUCCESS;
// init st
if ((ret = srs_init_st()) != ERROR_SUCCESS) {
srs_error("init st failed. ret=%d", ret);
return ret;
}
// @remark, st alloc segment use mmap, which only support 32757 threads,
// if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
// TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
if (_srs_config->get_max_connections() > 32756) {
ret = ERROR_ST_EXCEED_THREADS;
srs_error("st mmap for stack allocation must <= %d threads, "
"@see Makefile of st for MALLOC_STACK, please build st manually by "
"\"make EXTRA_CFLAGS=-DMALLOC_STACK linux-debug\", ret=%d", ret);
return ret;
}
// set current log id.
_srs_context->generate_id();
srs_trace("server main cid=%d", _srs_context->get_id());
return ret;
}
int SrsServer::initialize_signal()
{
return signal_manager->initialize();
@ -669,34 +712,6 @@ int SrsServer::acquire_pid_file()
return ret;
}
int SrsServer::initialize_st()
{
int ret = ERROR_SUCCESS;
// init st
if ((ret = srs_init_st()) != ERROR_SUCCESS) {
srs_error("init st failed. ret=%d", ret);
return ret;
}
// @remark, st alloc segment use mmap, which only support 32757 threads,
// if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
// TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
if (_srs_config->get_max_connections() > 32756) {
ret = ERROR_ST_EXCEED_THREADS;
srs_error("st mmap for stack allocation must <= %d threads, "
"@see Makefile of st for MALLOC_STACK, please build st manually by "
"\"make EXTRA_CFLAGS=-DMALLOC_STACK linux-debug\", ret=%d", ret);
return ret;
}
// set current log id.
_srs_context->generate_id();
srs_trace("server main cid=%d", _srs_context->get_id());
return ret;
}
int SrsServer::listen()
{
int ret = ERROR_SUCCESS;
@ -959,6 +974,7 @@ int SrsServer::do_cycle()
}
#endif
#endif
srs_info("server main thread loop");
}
}
@ -1103,7 +1119,7 @@ void SrsServer::close_listeners(SrsListenerType type)
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it;
if (listener->type() != type) {
if (listener->listen_type() != type) {
++it;
continue;
}

View file

@ -80,17 +80,17 @@ enum SrsListenerType
class SrsListener
{
protected:
SrsListenerType _type;
SrsListenerType type;
protected:
std::string _ip;
int _port;
SrsServer* _server;
std::string ip;
int port;
SrsServer* server;
public:
SrsListener(SrsServer* server, SrsListenerType type);
SrsListener(SrsServer* svr, SrsListenerType t);
virtual ~SrsListener();
public:
virtual SrsListenerType type();
virtual int listen(std::string ip, int port) = 0;
virtual SrsListenerType listen_type();
virtual int listen(std::string i, int p) = 0;
};
/**
@ -120,10 +120,10 @@ private:
SrsTcpListener* listener;
ISrsTcpHandler* caster;
public:
SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsRtspListener();
public:
virtual int listen(std::string ip, int port);
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
@ -138,28 +138,40 @@ private:
SrsTcpListener* listener;
SrsAppCasterFlv* caster;
public:
SrsHttpFlvListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsHttpFlvListener();
public:
virtual int listen(std::string ip, int port);
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
};
#endif
/**
* the udp listener, for udp server.
*/
class SrsUdpCasterListener : public SrsListener
* the udp listener, for udp server.
*/
class SrsUdpStreamListener : public SrsListener
{
private:
protected:
SrsUdpListener* listener;
ISrsUdpHandler* caster;
public:
SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
virtual ~SrsUdpCasterListener();
SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c);
virtual ~SrsUdpStreamListener();
public:
virtual int listen(std::string ip, int port);
virtual int listen(std::string i, int p);
};
/**
* the udp listener, for udp stream caster server.
*/
#ifdef SRS_AUTO_STREAM_CASTER
class SrsUdpCasterListener : public SrsUdpStreamListener
{
public:
SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsUdpCasterListener();
};
#endif
@ -337,7 +349,7 @@ public:
* @param client_stfd, the client fd in st boxed, the underlayer fd.
*/
virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
// interface ISrsThreadHandler.
// interface ISrsReloadHandler.
public:
virtual int on_reload_listen();
virtual int on_reload_pid();

View file

@ -45,6 +45,7 @@ using namespace std;
#include <srs_app_hds.hpp>
#include <srs_app_statistic.hpp>
#include <srs_core_autofree.hpp>
#include <srs_rtmp_utility.hpp>
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 40
@ -759,6 +760,20 @@ SrsSource* SrsSource::fetch(SrsRequest* r)
return source;
}
SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream)
{
SrsSource* source = NULL;
string stream_url = srs_generate_stream_url(vhost, app, stream);
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
source = pool[stream_url];
return source;
}
void SrsSource::destroy()
{
std::map<std::string, SrsSource*>::iterator it;

View file

@ -407,6 +407,10 @@ public:
*/
static SrsSource* fetch(SrsRequest* r);
/**
* get the exists source by stream info(vhost, app, stream), NULL when not exists.
*/
static SrsSource* fetch(std::string vhost, std::string app, std::string stream);
/**
* when system exit, destroy the sources,
* for gmc to analysis mem leaks.
*/

View file

@ -195,6 +195,7 @@ int SrsStatistic::on_client(int id, SrsRequest* req)
SrsStatisticClient* client = NULL;
if (clients.find(id) == clients.end()) {
client = new SrsStatisticClient();
client->id = id;
client->stream = stream;
clients[id] = client;
} else {

View file

@ -32,93 +32,105 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_st.hpp>
/**
* the handler for the thread, callback interface.
* the thread model defines as:
* handler->on_thread_start()
* while loop:
* handler->on_before_cycle()
* handler->cycle()
* handler->on_end_cycle()
* if !loop then break for user stop thread.
* sleep(CycleIntervalMilliseconds)
* handler->on_thread_stop()
* when stop, the thread will interrupt the st_thread,
* which will cause the socket to return error and
* terminate the cycle thread.
*
* Usage 1: stop by other thread.
* user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread:
* 1. create SrsThread field, with joinable true.
* 2. must use stop to stop and join the thread.
* for example:
* class SrsIngester : public ISrsThreadHandler {
* public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
* // check status, start ffmpeg when stopped.
* }
* };
*
* Usage 2: stop by thread itself.
* user can create thread which stop itself,
* generally only need to provides a start method,
* the object will destroy itself then terminate the thread, @see SrsConnection
* 1. create SrsThread field, with joinable false.
* 2. owner stop thread loop, destroy itself when thread stop.
* for example:
* class SrsConnection : public ISrsThreadHandler {
* public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); }
* public: virtual int start() { return pthread->start(); }
* public: virtual int cycle() {
* // serve client.
* // set loop to stop to quit, stop thread itself.
* pthread->stop_loop();
* }
* public: virtual int on_thread_stop() {
* // remove the connection in thread itself.
* server->remove(this);
* }
* };
*
* Usage 3: loop in the cycle method.
* user can use loop code in the cycle method, @see SrsForwarder
* 1. create SrsThread field, with or without joinable is ok.
* 2. loop code in cycle method, check the can_loop() for thread to quit.
* for example:
* class SrsForwarder : public ISrsThreadHandler {
* public: virtual int cycle() {
* while (pthread->can_loop()) {
* // read msgs from queue and forward to server.
* }
* }
* };
*
* @remark why should check can_loop() in cycle method?
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
*
* @remark why should use stop_loop() to terminate thread in itself?
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*
* @remark when should set the interval_us, and when not?
* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
* so the interval_us used to sleep for each cycle.
*/
* the handler for the thread, callback interface.
* the thread model defines as:
* handler->on_thread_start()
* while loop:
* handler->on_before_cycle()
* handler->cycle()
* handler->on_end_cycle()
* if !loop then break for user stop thread.
* sleep(CycleIntervalMilliseconds)
* handler->on_thread_stop()
* when stop, the thread will interrupt the st_thread,
* which will cause the socket to return error and
* terminate the cycle thread.
*
* Usage 1: loop thread never quit.
* user can create thread always running util server terminate.
* the step to create a thread never stop:
* 1. create SrsThread field, with joinable false.
* for example:
* class SrsStreamCache : public ISrsThreadHandler {
* public: SrsStreamCache() { pthread = new SrsThread("http-stream", this, SRS_AUTO_STREAM_SLEEP_US, false); }
* public: virtual int cycle() {
* // check status, start ffmpeg when stopped.
* }
* }
*
* Usage 2: stop by other thread.
* user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread:
* 1. create SrsThread field, with joinable true.
* 2. must use stop to stop and join the thread.
* for example:
* class SrsIngester : public ISrsThreadHandler {
* public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
* // check status, start ffmpeg when stopped.
* }
* };
*
* Usage 3: stop by thread itself.
* user can create thread which stop itself,
* generally only need to provides a start method,
* the object will destroy itself then terminate the thread, @see SrsConnection
* 1. create SrsThread field, with joinable false.
* 2. owner stop thread loop, destroy itself when thread stop.
* for example:
* class SrsConnection : public ISrsThreadHandler {
* public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); }
* public: virtual int start() { return pthread->start(); }
* public: virtual int cycle() {
* // serve client.
* // set loop to stop to quit, stop thread itself.
* pthread->stop_loop();
* }
* public: virtual int on_thread_stop() {
* // remove the connection in thread itself.
* server->remove(this);
* }
* };
*
* Usage 4: loop in the cycle method.
* user can use loop code in the cycle method, @see SrsForwarder
* 1. create SrsThread field, with or without joinable is ok.
* 2. loop code in cycle method, check the can_loop() for thread to quit.
* for example:
* class SrsForwarder : public ISrsThreadHandler {
* public: virtual int cycle() {
* while (pthread->can_loop()) {
* // read msgs from queue and forward to server.
* }
* }
* };
*
* @remark why should check can_loop() in cycle method?
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
*
* @remark why should use stop_loop() to terminate thread in itself?
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*
* @remark when should set the interval_us, and when not?
* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
* so the interval_us used to sleep for each cycle.
*/
class ISrsThreadHandler
{
public:

View file

@ -418,15 +418,13 @@ bool get_proc_self_stat(SrsProcSelfStat& r)
void srs_update_proc_stat()
{
// always assert the USER_HZ is 1/100ths
// @see: http://stackoverflow.com/questions/7298646/calculating-user-nice-sys-idle-iowait-irq-and-sirq-from-proc-stat/7298711
static bool user_hz_assert = false;
if (!user_hz_assert) {
user_hz_assert = true;
int USER_HZ = sysconf(_SC_CLK_TCK);
srs_trace("USER_HZ=%d", USER_HZ);
srs_assert(USER_HZ == 100);
// @see https://github.com/simple-rtmp-server/srs/issues/397
static int user_hz = 0;
if (user_hz <= 0) {
user_hz = sysconf(_SC_CLK_TCK);
srs_trace("USER_HZ=%d", user_hz);
srs_assert(user_hz > 0);
}
// system cpu stat
@ -471,7 +469,7 @@ void srs_update_proc_stat()
int64_t total = r.sample_time - o.sample_time;
int64_t usage = (r.utime + r.stime) - (o.utime + o.stime);
if (total > 0) {
r.percent = (float)(usage * 1000 / (double)total / 100);
r.percent = (float)(usage * 1000 / (double)total / user_hz);
}
// upate cache.

View file

@ -201,6 +201,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// query string seprator
#define SRS_CONSTS_HTTP_QUERY_SEP '?'
// the default recv timeout.
#define SRS_HTTP_RECV_TIMEOUT_US 60 * 1000 * 1000
// 6.1.1 Status Code and Reason Phrase
#define SRS_CONSTS_HTTP_Continue 100
#define SRS_CONSTS_HTTP_SwitchingProtocols 101

View file

@ -33,6 +33,7 @@ bool srs_is_client_gracefully_close(int error_code)
{
return error_code == ERROR_SOCKET_READ
|| error_code == ERROR_SOCKET_READ_FULLY
|| error_code == ERROR_SOCKET_WRITE;
|| error_code == ERROR_SOCKET_WRITE
|| error_code == ERROR_SOCKET_TIMEOUT;
}

View file

@ -255,7 +255,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_HTTP_INVALID_CHUNK_HEADER 4026
#define ERROR_AVC_NALU_UEV 4027
#define ERROR_AAC_BYTES_INVALID 4028
#define ERROR_HTTP_REQUEST_EOF 4029
#define ERROR_HTTP_REQUEST_EOF 4029
///////////////////////////////////////////////////////
// user-define error.

View file

@ -723,8 +723,8 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
// because when audio stream_number is 0, the elementary is ADTS(aac-mp4a-format-ISO_IEC_14496-3+2001.pdf, page 75, 1.A.2.2 ADTS).
// about the bytes of PES_packet_data_byte, defined in hls-mpeg-ts-iso13818-1.pdf, page 58
// PES_packet_data_byte "C PES_packet_data_bytes shall be contiguous bytes of data from the elementary stream
// indicated by the packets stream_id or PID. When the elementary stream data conforms to ITU-T
// PES_packet_data_byte ¨C PES_packet_data_bytes shall be contiguous bytes of data from the elementary stream
// indicated by the packet¡¯s stream_id or PID. When the elementary stream data conforms to ITU-T
// Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 13818-3, the PES_packet_data_bytes shall be byte aligned to the bytes of this
// Recommendation | International Standard. The byte-order of the elementary stream shall be preserved. The number of
// PES_packet_data_bytes, N, is specified by the PES_packet_length field. N shall be equal to the value indicated in the
@ -735,12 +735,12 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg)
// PES_packet_data_byte field are user definable and will not be specified by ITU-T | ISO/IEC in the future.
// about the bytes of stream_id, define in hls-mpeg-ts-iso13818-1.pdf, page 49
// stream_id "C In Program Streams, the stream_id specifies the type and number of the elementary stream as defined by the
// stream_id ¨C In Program Streams, the stream_id specifies the type and number of the elementary stream as defined by the
// stream_id Table 2-18. In Transport Streams, the stream_id may be set to any valid value which correctly describes the
// elementary stream type as defined in Table 2-18. In Transport Streams, the elementary stream type is specified in the
// Program Specific Information as specified in 2.4.4.
// about the stream_id table, define in Table 2-18 "C Stream_id assignments, hls-mpeg-ts-iso13818-1.pdf, page 52.
// about the stream_id table, define in Table 2-18 ¨C Stream_id assignments, hls-mpeg-ts-iso13818-1.pdf, page 52.
//
// 110x xxxx
// ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC

View file

@ -344,6 +344,10 @@ int run_master()
{
int ret = ERROR_SUCCESS;
if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
return ret;
}
@ -352,10 +356,6 @@ int run_master()
return ret;
}
if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
return ret;
}

View file

@ -38,36 +38,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
using namespace std;
/**
* the signature for packets to client.
*/
#define RTMP_SIG_FMS_VER "3,5,3,888"
#define RTMP_SIG_AMF0_VER 0
#define RTMP_SIG_CLIENT_ID "ASAICiss"
/**
* onStatus consts.
*/
#define StatusLevel "level"
#define StatusCode "code"
#define StatusDescription "description"
#define StatusDetails "details"
#define StatusClientId "clientid"
// status value
#define StatusLevelStatus "status"
// status error
#define StatusLevelError "error"
// code value
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeConnectRejected "NetConnection.Connect.Rejected"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodeStreamPause "NetStream.Pause.Notify"
#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"
#define StatusCodePublishStart "NetStream.Publish.Start"
#define StatusCodeDataStart "NetStream.Data.Start"
#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
// FMLE
#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"
#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"
@ -129,15 +99,7 @@ void SrsRequest::update_auth(SrsRequest* req)
string SrsRequest::get_stream_url()
{
std::string url = "";
url += vhost;
url += "/";
url += app;
url += "/";
url += stream;
return url;
return srs_generate_stream_url(vhost, app, stream);
}
void SrsRequest::strip()

View file

@ -48,6 +48,36 @@ class SrsPacket;
class SrsAmf0Object;
class IMergeReadHandler;
/**
* the signature for packets to client.
*/
#define RTMP_SIG_FMS_VER "3,5,3,888"
#define RTMP_SIG_AMF0_VER 0
#define RTMP_SIG_CLIENT_ID "ASAICiss"
/**
* onStatus consts.
*/
#define StatusLevel "level"
#define StatusCode "code"
#define StatusDescription "description"
#define StatusDetails "details"
#define StatusClientId "clientid"
// status value
#define StatusLevelStatus "status"
// status error
#define StatusLevelError "error"
// code value
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeConnectRejected "NetConnection.Connect.Rejected"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodeStreamPause "NetStream.Pause.Notify"
#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"
#define StatusCodePublishStart "NetStream.Publish.Start"
#define StatusCodeDataStart "NetStream.Data.Start"
#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
/**
* the original request from client.
*/

View file

@ -43,89 +43,6 @@ using namespace std;
// increase recv timeout to got an entire message.
#define SRS_MIN_RECV_TIMEOUT_US (int64_t)(60*1000*1000LL)
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
5. Protocol Control Messages
RTMP reserves message type IDs 1-7 for protocol control messages.
These messages contain information needed by the RTM Chunk Stream
protocol or RTMP itself. Protocol messages with IDs 1 & 2 are
reserved for usage with RTM Chunk Stream protocol. Protocol messages
with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
7 is used between edge server and origin server.
*/
#define RTMP_MSG_SetChunkSize 0x01
#define RTMP_MSG_AbortMessage 0x02
#define RTMP_MSG_Acknowledgement 0x03
#define RTMP_MSG_UserControlMessage 0x04
#define RTMP_MSG_WindowAcknowledgementSize 0x05
#define RTMP_MSG_SetPeerBandwidth 0x06
#define RTMP_MSG_EdgeAndOriginServerCommand 0x07
/**
3. Types of messages
The server and the client send messages over the network to
communicate with each other. The messages can be of any type which
includes audio messages, video messages, command messages, shared
object messages, data messages, and user control messages.
3.1. Command message
Command messages carry the AMF-encoded commands between the client
and the server. These messages have been assigned message type value
of 20 for AMF0 encoding and message type value of 17 for AMF3
encoding. These messages are sent to perform some operations like
connect, createStream, publish, play, pause on the peer. Command
messages like onstatus, result etc. are used to inform the sender
about the status of the requested commands. A command message
consists of command name, transaction ID, and command object that
contains related parameters. A client or a server can request Remote
Procedure Calls (RPC) over streams that are communicated using the
command messages to the peer.
*/
#define RTMP_MSG_AMF3CommandMessage 17 // 0x11
#define RTMP_MSG_AMF0CommandMessage 20 // 0x14
/**
3.2. Data message
The client or the server sends this message to send Metadata or any
user data to the peer. Metadata includes details about the
data(audio, video etc.) like creation time, duration, theme and so
on. These messages have been assigned message type value of 18 for
AMF0 and message type value of 15 for AMF3.
*/
#define RTMP_MSG_AMF0DataMessage 18 // 0x12
#define RTMP_MSG_AMF3DataMessage 15 // 0x0F
/**
3.3. Shared object message
A shared object is a Flash object (a collection of name value pairs)
that are in synchronization across multiple clients, instances, and
so on. The message types kMsgContainer=19 for AMF0 and
kMsgContainerEx=16 for AMF3 are reserved for shared object events.
Each message can contain multiple events.
*/
#define RTMP_MSG_AMF3SharedObject 16 // 0x10
#define RTMP_MSG_AMF0SharedObject 19 // 0x13
/**
3.4. Audio message
The client or the server sends this message to send audio data to the
peer. The message type value of 8 is reserved for audio messages.
*/
#define RTMP_MSG_AudioMessage 8 // 0x08
/* *
3.5. Video message
The client or the server sends this message to send video data to the
peer. The message type value of 9 is reserved for video messages.
These messages are large and can delay the sending of other type of
messages. To avoid such a situation, the video message is assigned
the lowest priority.
*/
#define RTMP_MSG_VideoMessage 9 // 0x09
/**
3.6. Aggregate message
An aggregate message is a single message that contains a list of submessages.
The message type value of 22 is reserved for aggregate
messages.
*/
#define RTMP_MSG_AggregateMessage 22 // 0x16
/****************************************************************************
*****************************************************************************
****************************************************************************/
@ -172,24 +89,6 @@ messages.
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* amf0 command message, command name macros
*/
#define RTMP_AMF0_COMMAND_CONNECT "connect"
#define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream"
#define RTMP_AMF0_COMMAND_CLOSE_STREAM "closeStream"
#define RTMP_AMF0_COMMAND_PLAY "play"
#define RTMP_AMF0_COMMAND_PAUSE "pause"
#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
#define RTMP_AMF0_COMMAND_ON_STATUS "onStatus"
#define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_COMMAND_ERROR "_error"
#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish"
#define RTMP_AMF0_COMMAND_UNPUBLISH "FCUnpublish"
#define RTMP_AMF0_COMMAND_PUBLISH "publish"
#define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess"
/**
* band width check method name, which will be invoked by client.
* band width check mothods use SrsBandwidthPacket as its internal packet type,

View file

@ -56,6 +56,110 @@ class SrsChunkStream;
class SrsSharedPtrMessage;
class IMergeReadHandler;
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
5. Protocol Control Messages
RTMP reserves message type IDs 1-7 for protocol control messages.
These messages contain information needed by the RTM Chunk Stream
protocol or RTMP itself. Protocol messages with IDs 1 & 2 are
reserved for usage with RTM Chunk Stream protocol. Protocol messages
with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
7 is used between edge server and origin server.
*/
#define RTMP_MSG_SetChunkSize 0x01
#define RTMP_MSG_AbortMessage 0x02
#define RTMP_MSG_Acknowledgement 0x03
#define RTMP_MSG_UserControlMessage 0x04
#define RTMP_MSG_WindowAcknowledgementSize 0x05
#define RTMP_MSG_SetPeerBandwidth 0x06
#define RTMP_MSG_EdgeAndOriginServerCommand 0x07
/**
3. Types of messages
The server and the client send messages over the network to
communicate with each other. The messages can be of any type which
includes audio messages, video messages, command messages, shared
object messages, data messages, and user control messages.
3.1. Command message
Command messages carry the AMF-encoded commands between the client
and the server. These messages have been assigned message type value
of 20 for AMF0 encoding and message type value of 17 for AMF3
encoding. These messages are sent to perform some operations like
connect, createStream, publish, play, pause on the peer. Command
messages like onstatus, result etc. are used to inform the sender
about the status of the requested commands. A command message
consists of command name, transaction ID, and command object that
contains related parameters. A client or a server can request Remote
Procedure Calls (RPC) over streams that are communicated using the
command messages to the peer.
*/
#define RTMP_MSG_AMF3CommandMessage 17 // 0x11
#define RTMP_MSG_AMF0CommandMessage 20 // 0x14
/**
3.2. Data message
The client or the server sends this message to send Metadata or any
user data to the peer. Metadata includes details about the
data(audio, video etc.) like creation time, duration, theme and so
on. These messages have been assigned message type value of 18 for
AMF0 and message type value of 15 for AMF3.
*/
#define RTMP_MSG_AMF0DataMessage 18 // 0x12
#define RTMP_MSG_AMF3DataMessage 15 // 0x0F
/**
3.3. Shared object message
A shared object is a Flash object (a collection of name value pairs)
that are in synchronization across multiple clients, instances, and
so on. The message types kMsgContainer=19 for AMF0 and
kMsgContainerEx=16 for AMF3 are reserved for shared object events.
Each message can contain multiple events.
*/
#define RTMP_MSG_AMF3SharedObject 16 // 0x10
#define RTMP_MSG_AMF0SharedObject 19 // 0x13
/**
3.4. Audio message
The client or the server sends this message to send audio data to the
peer. The message type value of 8 is reserved for audio messages.
*/
#define RTMP_MSG_AudioMessage 8 // 0x08
/* *
3.5. Video message
The client or the server sends this message to send video data to the
peer. The message type value of 9 is reserved for video messages.
These messages are large and can delay the sending of other type of
messages. To avoid such a situation, the video message is assigned
the lowest priority.
*/
#define RTMP_MSG_VideoMessage 9 // 0x09
/**
3.6. Aggregate message
An aggregate message is a single message that contains a list of submessages.
The message type value of 22 is reserved for aggregate
messages.
*/
#define RTMP_MSG_AggregateMessage 22 // 0x16
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* amf0 command message, command name macros
*/
#define RTMP_AMF0_COMMAND_CONNECT "connect"
#define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream"
#define RTMP_AMF0_COMMAND_CLOSE_STREAM "closeStream"
#define RTMP_AMF0_COMMAND_PLAY "play"
#define RTMP_AMF0_COMMAND_PAUSE "pause"
#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
#define RTMP_AMF0_COMMAND_ON_STATUS "onStatus"
#define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_COMMAND_ERROR "_error"
#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish"
#define RTMP_AMF0_COMMAND_UNPUBLISH "FCUnpublish"
#define RTMP_AMF0_COMMAND_PUBLISH "publish"
#define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess"
/****************************************************************************
*****************************************************************************
****************************************************************************/

View file

@ -31,6 +31,7 @@ using namespace std;
#include <srs_kernel_stream.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_consts.hpp>
void srs_discovery_tc_url(
string tcUrl,
@ -78,22 +79,22 @@ void srs_vhost_resolve(string& vhost, string& app, string& param)
app = srs_string_replace(app, "&&", "?");
app = srs_string_replace(app, "=", "?");
if ((pos = app.find("?")) == std::string::npos) {
return;
}
if ((pos = app.find("?")) != std::string::npos) {
std::string query = app.substr(pos + 1);
app = app.substr(0, pos);
std::string query = app.substr(pos + 1);
app = app.substr(0, pos);
if ((pos = query.find("vhost?")) != std::string::npos) {
query = query.substr(pos + 6);
if (!query.empty()) {
vhost = query;
}
if ((pos = vhost.find("?")) != std::string::npos) {
vhost = vhost.substr(0, pos);
if ((pos = query.find("vhost?")) != std::string::npos) {
query = query.substr(pos + 6);
if (!query.empty()) {
vhost = query;
}
if ((pos = vhost.find("?")) != std::string::npos) {
vhost = vhost.substr(0, pos);
}
}
}
/* others */
}
void srs_random_generate(char* bytes, int size)
@ -346,3 +347,18 @@ int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, in
return ret;
}
std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream)
{
std::string url = "";
if (SRS_CONSTS_RTMP_DEFAULT_VHOST != vhost){
url += vhost;
}
url += "/";
url += app;
url += "/";
url += stream;
return url;
}

View file

@ -63,7 +63,9 @@ extern void srs_discovery_tc_url(
* app...vhost...request_vhost
* @param param, the query, for example, ?vhost=xxx
*/
extern void srs_vhost_resolve(std::string& vhost, std::string& app, std::string& param);
extern void srs_vhost_resolve(
std::string& vhost, std::string& app, std::string& param
);
/**
* generate ramdom data for handshake.
@ -118,5 +120,8 @@ extern int srs_chunk_header_c3(
*/
extern int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, int stream_id, SrsSharedPtrMessage** ppmsg);
// get the stream identify, vhost/app/stream.
extern std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream);
#endif