1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch '2.0release' into develop

This commit is contained in:
winlin 2015-08-22 13:57:45 +08:00
commit 22485ce5e9
14 changed files with 116 additions and 72 deletions

View file

@ -41,6 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
manager = cm; manager = cm;
stfd = c; stfd = c;
disposed = false; disposed = false;
expired = false;
// the client thread should reap itself, // the client thread should reap itself,
// so we never use joinable. // so we never use joinable.
@ -116,4 +117,9 @@ int SrsConnection::srs_id()
return id; return id;
} }
void SrsConnection::expire()
{
expired = true;
}

View file

@ -88,6 +88,11 @@ protected:
* when disposed, connection should stop cycle and cleanup itself. * when disposed, connection should stop cycle and cleanup itself.
*/ */
bool disposed; bool disposed;
/**
* whether connection is expired, application definition.
* when expired, the connection must never be served and quit ASAP.
*/
bool expired;
public: public:
SrsConnection(IConnectionManager* cm, st_netfd_t c); SrsConnection(IConnectionManager* cm, st_netfd_t c);
virtual ~SrsConnection(); virtual ~SrsConnection();
@ -125,6 +130,10 @@ public:
* get the srs id which identify the client. * get the srs id which identify the client.
*/ */
virtual int srs_id(); virtual int srs_id();
/**
* set connection to expired.
*/
virtual void expire();
protected: protected:
/** /**
* for concrete connection to do the cycle. * for concrete connection to do the cycle.

View file

@ -661,21 +661,7 @@ int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
return srs_http_response_code(w, ret); return srs_http_response_code(w, ret);
} }
if (r->is_http_delete()) { if (r->is_http_get()) {
srs_assert(stream);
SrsSource* source = SrsSource::fetch(stream->vhost->vhost, stream->app, stream->stream);
if (!source) {
ret = ERROR_SOURCE_NOT_FOUND;
srs_warn("source not found for sid=%d", sid);
return srs_http_response_code(w, ret);
}
source->set_expired();
srs_warn("disconnent stream=%d successfully. vhost=%s, app=%s, stream=%s.",
sid, stream->vhost->vhost.c_str(), stream->app.c_str(), stream->stream.c_str());
return srs_http_response_code(w, ret);
} else if (r->is_http_get()) {
std::stringstream data; std::stringstream data;
if (!stream) { if (!stream) {
@ -726,10 +712,15 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
ret = ERROR_RTMP_STREAM_NOT_FOUND; ret = ERROR_RTMP_STREAM_NOT_FOUND;
srs_error("stream client_id=%d not found. ret=%d", cid, ret); srs_error("stream client_id=%d not found. ret=%d", cid, ret);
return srs_http_response_code(w, ret); return srs_http_response_code(w, ret);
} }
if (r->is_http_get()) { if (r->is_http_delete()) {
srs_assert(client);
client->conn->expire();
srs_warn("delete client=%d ok", cid);
return srs_http_response_code(w, ret);
} else if (r->is_http_get()) {
std::stringstream data; std::stringstream data;
if (!client) { if (!client) {
@ -751,6 +742,8 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
} }
return srs_http_response_json(w, ss.str()); return srs_http_response_json(w, ss.str());
} else {
return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed);
} }
return ret; return ret;

View file

@ -495,7 +495,7 @@ int SrsRtmpConn::stream_service_cycle()
// update the statistic when source disconveried. // update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) { if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {
srs_error("stat client failed. ret=%d", ret); srs_error("stat client failed. ret=%d", ret);
return ret; return ret;
} }
@ -672,6 +672,13 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// collect elapse for pithy print. // collect elapse for pithy print.
pprint->elapse(); pprint->elapse();
// when source is set to expired, disconnect it.
if (expired) {
ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret);
return ret;
}
// to use isolate thread to recv, can improve about 33% performance. // to use isolate thread to recv, can improve about 33% performance.
// @see: https://github.com/simple-rtmp-server/srs/issues/196 // @see: https://github.com/simple-rtmp-server/srs/issues/196
// @see: https://github.com/simple-rtmp-server/srs/issues/217 // @see: https://github.com/simple-rtmp-server/srs/issues/217
@ -875,9 +882,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
pprint->elapse(); pprint->elapse();
// when source is set to expired, disconnect it. // when source is set to expired, disconnect it.
if (source->expired()) { if (expired) {
ret = ERROR_USER_DISCONNECT; ret = ERROR_USER_DISCONNECT;
srs_error("source is expired. ret=%d", ret); srs_error("connection expired. ret=%d", ret);
return ret; return ret;
} }

View file

@ -898,7 +898,6 @@ SrsSource::SrsSource()
jitter_algorithm = SrsRtmpJitterAlgorithmOFF; jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false; mix_correct = false;
mix_queue = new SrsMixQueue(); mix_queue = new SrsMixQueue();
is_expired = false;
#ifdef SRS_AUTO_HLS #ifdef SRS_AUTO_HLS
hls = new SrsHls(); hls = new SrsHls();
@ -2066,7 +2065,7 @@ int SrsSource::on_publish()
return ret; return ret;
} }
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(_req); stat->on_stream_publish(_req, _source_id);
return ret; return ret;
} }
@ -2101,7 +2100,6 @@ void SrsSource::on_unpublish()
_can_publish = true; _can_publish = true;
_source_id = -1; _source_id = -1;
is_expired = false;
// notify the handler. // notify the handler.
srs_assert(handler); srs_assert(handler);
@ -2260,13 +2258,3 @@ void SrsSource::destroy_forwarders()
forwarders.clear(); forwarders.clear();
} }
bool SrsSource::expired()
{
return is_expired;
}
void SrsSource::set_expired()
{
is_expired = true;
}

View file

@ -453,8 +453,6 @@ private:
// whether use interlaced/mixed algorithm to correct timestamp. // whether use interlaced/mixed algorithm to correct timestamp.
bool mix_correct; bool mix_correct;
SrsMixQueue* mix_queue; SrsMixQueue* mix_queue;
// the flag of source expired or not.
bool is_expired;
// whether stream is monotonically increase. // whether stream is monotonically increase.
bool is_monotonically_increase; bool is_monotonically_increase;
int64_t last_packet_time; int64_t last_packet_time;
@ -586,12 +584,6 @@ public:
private: private:
virtual int create_forwarders(); virtual int create_forwarders();
virtual void destroy_forwarders(); virtual void destroy_forwarders();
public:
virtual bool expired();
/**
* set source expired.
*/
virtual void set_expired();
}; };
#endif #endif

View file

@ -72,8 +72,8 @@ int SrsStatisticVhost::dumps(stringstream& ss)
<< SRS_JFIELD_ORG("send_bytes", kbps->get_send_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("send_bytes", kbps->get_send_bytes()) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("recv_bytes", kbps->get_recv_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_bytes", kbps->get_recv_bytes()) << SRS_JFIELD_CONT
<< SRS_JFIELD_OBJ("kbps") << SRS_JFIELD_OBJ("kbps")
<< SRS_JFIELD_ORG("r30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("s30s", kbps->get_send_kbps_30s()) << SRS_JFIELD_ORG("send_30s", kbps->get_send_kbps_30s())
<< SRS_JOBJECT_END << SRS_JFIELD_CONT << SRS_JOBJECT_END << SRS_JFIELD_CONT
<< SRS_JFIELD_NAME("hls") << SRS_JOBJECT_START << SRS_JFIELD_NAME("hls") << SRS_JOBJECT_START
<< SRS_JFIELD_BOOL("enabled", hls_enabled); << SRS_JFIELD_BOOL("enabled", hls_enabled);
@ -91,7 +91,8 @@ SrsStatisticStream::SrsStatisticStream()
{ {
id = srs_generate_id(); id = srs_generate_id();
vhost = NULL; vhost = NULL;
status = STATISTIC_STREAM_STATUS_IDLING; active = false;
connection_cid = -1;
has_video = false; has_video = false;
vcodec = SrsCodecVideoReserved; vcodec = SrsCodecVideoReserved;
@ -124,15 +125,18 @@ int SrsStatisticStream::dumps(stringstream& ss)
<< SRS_JFIELD_STR("name", stream) << SRS_JFIELD_CONT << SRS_JFIELD_STR("name", stream) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("vhost", vhost->id) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("vhost", vhost->id) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("app", app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", app) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("live_ms", srs_get_system_time_ms()) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("clients", nb_clients) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("clients", nb_clients) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("send_bytes", kbps->get_send_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("send_bytes", kbps->get_send_bytes()) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("recv_bytes", kbps->get_recv_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_bytes", kbps->get_recv_bytes()) << SRS_JFIELD_CONT
<< SRS_JFIELD_OBJ("kbps") << SRS_JFIELD_OBJ("kbps")
<< SRS_JFIELD_ORG("r30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("s30s", kbps->get_send_kbps_30s()) << SRS_JFIELD_ORG("send_30s", kbps->get_send_kbps_30s())
<< SRS_JOBJECT_END << SRS_JFIELD_CONT << SRS_JOBJECT_END << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("live_ms", srs_get_system_time_ms()) << SRS_JFIELD_CONT << SRS_JFIELD_OBJ("publish")
<< SRS_JFIELD_STR("status", status) << SRS_JFIELD_CONT; << SRS_JFIELD_BOOL("active", active) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("cid", connection_cid)
<< SRS_JOBJECT_END << SRS_JFIELD_CONT;
if (!has_video) { if (!has_video) {
ss << SRS_JFIELD_NULL("video") << SRS_JFIELD_CONT; ss << SRS_JFIELD_NULL("video") << SRS_JFIELD_CONT;
@ -161,21 +165,27 @@ int SrsStatisticStream::dumps(stringstream& ss)
return ret; return ret;
} }
void SrsStatisticStream::publish() void SrsStatisticStream::publish(int cid)
{ {
status = STATISTIC_STREAM_STATUS_PUBLISHING; connection_cid = cid;
active = true;
} }
void SrsStatisticStream::close() void SrsStatisticStream::close()
{ {
has_video = false; has_video = false;
has_audio = false; has_audio = false;
status = STATISTIC_STREAM_STATUS_IDLING; active = false;
} }
SrsStatisticClient::SrsStatisticClient() SrsStatisticClient::SrsStatisticClient()
{ {
id = 0; id = 0;
stream = NULL;
conn = NULL;
req = NULL;
type = SrsRtmpConnUnknown;
create = srs_get_system_time_ms();
} }
SrsStatisticClient::~SrsStatisticClient() SrsStatisticClient::~SrsStatisticClient()
@ -187,7 +197,17 @@ int SrsStatisticClient::dumps(stringstream& ss)
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
ss << SRS_JOBJECT_START ss << SRS_JOBJECT_START
<< SRS_JFIELD_ORG("id", id) << SRS_JFIELD_ORG("id", id) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("vhost", stream->vhost->id) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("stream", stream->id) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("pageUrl", req->pageUrl) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("swfUrl", req->swfUrl) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("tcUrl", req->tcUrl) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("url", req->get_stream_url()) << SRS_JFIELD_CONT
<< SRS_JFIELD_STR("type", srs_client_type_string(type)) << SRS_JFIELD_CONT
<< SRS_JFIELD_BOOL("publish", srs_client_type_is_publish(type)) << SRS_JFIELD_CONT
<< SRS_JFIELD_ORG("alive", srs_get_system_time_ms() - create)
<< SRS_JOBJECT_END; << SRS_JOBJECT_END;
return ret; return ret;
@ -301,12 +321,12 @@ int SrsStatistic::on_audio_info(SrsRequest* req,
return ret; return ret;
} }
void SrsStatistic::on_stream_publish(SrsRequest* req) void SrsStatistic::on_stream_publish(SrsRequest* req, int cid)
{ {
SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticVhost* vhost = create_vhost(req);
SrsStatisticStream* stream = create_stream(vhost, req); SrsStatisticStream* stream = create_stream(vhost, req);
stream->publish(); stream->publish(cid);
} }
void SrsStatistic::on_stream_close(SrsRequest* req) void SrsStatistic::on_stream_close(SrsRequest* req)
@ -317,7 +337,7 @@ void SrsStatistic::on_stream_close(SrsRequest* req)
stream->close(); stream->close();
} }
int SrsStatistic::on_client(int id, SrsRequest* req) int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -336,6 +356,9 @@ int SrsStatistic::on_client(int id, SrsRequest* req)
} }
// got client. // got client.
client->conn = conn;
client->req = req;
client->type = type;
stream->nb_clients++; stream->nb_clients++;
vhost->nb_clients++; vhost->nb_clients++;
@ -458,7 +481,7 @@ int SrsStatistic::dumps_clients(stringstream& ss, int start, int count)
ss << SRS_JARRAY_START; ss << SRS_JARRAY_START;
std::map<int, SrsStatisticClient*>::iterator it = clients.begin(); std::map<int, SrsStatisticClient*>::iterator it = clients.begin();
for (int i = 0; i < count && it != clients.end(); it++) { for (int i = 0; i < start + count && it != clients.end(); it++, i++) {
if (i < start) { if (i < start) {
continue; continue;
} }

View file

@ -34,9 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <string> #include <string>
#include <srs_kernel_codec.hpp> #include <srs_kernel_codec.hpp>
#include <srs_rtmp_stack.hpp>
#define STATISTIC_STREAM_STATUS_PUBLISHING "publishing"
#define STATISTIC_STREAM_STATUS_IDLING "idling"
class SrsKbps; class SrsKbps;
class SrsRequest; class SrsRequest;
@ -68,7 +66,8 @@ public:
std::string app; std::string app;
std::string stream; std::string stream;
std::string url; std::string url;
std::string status; bool active;
int connection_cid;
int nb_clients; int nb_clients;
public: public:
/** /**
@ -103,7 +102,7 @@ public:
/** /**
* publish the stream. * publish the stream.
*/ */
virtual void publish(); virtual void publish(int cid);
/** /**
* close the stream. * close the stream.
*/ */
@ -114,7 +113,11 @@ struct SrsStatisticClient
{ {
public: public:
SrsStatisticStream* stream; SrsStatisticStream* stream;
SrsConnection* conn;
SrsRequest* req;
SrsRtmpConnType type;
int id; int id;
int64_t create;
public: public:
SrsStatisticClient(); SrsStatisticClient();
virtual ~SrsStatisticClient(); virtual ~SrsStatisticClient();
@ -169,9 +172,11 @@ public:
SrsAacObjectType aac_object SrsAacObjectType aac_object
); );
/** /**
* when publish stream. * when publish stream.
*/ * @param req the request object of publish connection.
virtual void on_stream_publish(SrsRequest* req); * @param cid the cid of publish connection.
*/
virtual void on_stream_publish(SrsRequest* req, int cid);
/** /**
* when close stream. * when close stream.
*/ */
@ -181,8 +186,10 @@ public:
* when got a client to publish/play stream, * when got a client to publish/play stream,
* @param id, the client srs id. * @param id, the client srs id.
* @param req, the client request object. * @param req, the client request object.
* @param conn, the physical absract connection object.
* @param type, the type of connection.
*/ */
virtual int on_client(int id, SrsRequest* req); virtual int on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type);
/** /**
* client disconnect * client disconnect
* @remark the on_disconnect always call, while the on_client is call when * @remark the on_disconnect always call, while the on_client is call when

View file

@ -263,6 +263,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_AAC_BYTES_INVALID 4028 #define ERROR_AAC_BYTES_INVALID 4028
#define ERROR_HTTP_REQUEST_EOF 4029 #define ERROR_HTTP_REQUEST_EOF 4029
///////////////////////////////////////////////////////
// HTTP API error.
///////////////////////////////////////////////////////
//#define ERROR_API_METHOD_NOT_ALLOWD
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
// user-define error. // user-define error.
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////

View file

@ -120,8 +120,11 @@ string srs_go_http_detect(char* data, int size)
return "application/octet-stream"; // fallback return "application/octet-stream"; // fallback
} }
// Error replies to the request with the specified error message and HTTP code. int srs_go_http_error(ISrsHttpResponseWriter* w, int code)
// The error message should be plain text. {
return srs_go_http_error(w, code, srs_generate_http_status_text(code));
}
int srs_go_http_error(ISrsHttpResponseWriter* w, int code, string error) int srs_go_http_error(ISrsHttpResponseWriter* w, int code, string error)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -287,7 +290,7 @@ bool SrsHttpNotFoundHandler::is_not_found()
int SrsHttpNotFoundHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) int SrsHttpNotFoundHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{ {
return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound, SRS_CONSTS_HTTP_NotFound_str); return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound);
} }
SrsHttpFileServer::SrsHttpFileServer(string root_dir) SrsHttpFileServer::SrsHttpFileServer(string root_dir)

View file

@ -76,6 +76,11 @@ class ISrsHttpResponseWriter;
#define SRS_CONSTS_HTTP_PUT HTTP_PUT #define SRS_CONSTS_HTTP_PUT HTTP_PUT
#define SRS_CONSTS_HTTP_DELETE HTTP_DELETE #define SRS_CONSTS_HTTP_DELETE HTTP_DELETE
// Error replies to the request with the specified error message and HTTP code.
// The error message should be plain text.
extern int srs_go_http_error(ISrsHttpResponseWriter* w, int code);
extern int srs_go_http_error(ISrsHttpResponseWriter* w, int code, std::string error);
// helper function: response in json format. // helper function: response in json format.
extern int srs_http_response_json(ISrsHttpResponseWriter* w, std::string data); extern int srs_http_response_json(ISrsHttpResponseWriter* w, std::string data);
/** /**

View file

@ -1747,12 +1747,17 @@ string srs_client_type_string(SrsRtmpConnType type)
{ {
switch (type) { switch (type) {
case SrsRtmpConnPlay: return "Play"; case SrsRtmpConnPlay: return "Play";
case SrsRtmpConnFlashPublish: return "publish(FlashPublish)"; case SrsRtmpConnFlashPublish: return "flash-publish)";
case SrsRtmpConnFMLEPublish: return "publish(FMLEPublish)"; case SrsRtmpConnFMLEPublish: return "fmle-publish";
default: return "Unknown"; default: return "Unknown";
} }
} }
bool srs_client_type_is_publish(SrsRtmpConnType type)
{
return type != SrsRtmpConnPlay;
}
SrsHandshakeBytes::SrsHandshakeBytes() SrsHandshakeBytes::SrsHandshakeBytes()
{ {
c0c1 = s0s1s2 = c2 = NULL; c0c1 = s0s1s2 = c2 = NULL;

View file

@ -622,6 +622,7 @@ enum SrsRtmpConnType
SrsRtmpConnFlashPublish, SrsRtmpConnFlashPublish,
}; };
std::string srs_client_type_string(SrsRtmpConnType type); std::string srs_client_type_string(SrsRtmpConnType type);
bool srs_client_type_is_publish(SrsRtmpConnType type);
/** /**
* store the handshake bytes, * store the handshake bytes,

View file

@ -247,7 +247,7 @@ int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, s
// for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 // for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213
#ifndef _WIN32 #ifndef _WIN32
// for linux, generally it's 1024. // for linux, generally it's 1024.
static int limits = sysconf(_SC_IOV_MAX); static int limits = (int)sysconf(_SC_IOV_MAX);
#else #else
static int limits = 1024; static int limits = 1024;
#endif #endif