diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 8cedcfec3..3aa88fbdb 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -41,6 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) manager = cm; stfd = c; disposed = false; + expired = false; // the client thread should reap itself, // so we never use joinable. @@ -116,4 +117,9 @@ int SrsConnection::srs_id() return id; } +void SrsConnection::expire() +{ + expired = true; +} + diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index d5130bf71..ae48421ba 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -88,6 +88,11 @@ protected: * when disposed, connection should stop cycle and cleanup itself. */ bool disposed; + /** + * whether connection is expired, application definition. + * when expired, the connection must never be served and quit ASAP. + */ + bool expired; public: SrsConnection(IConnectionManager* cm, st_netfd_t c); virtual ~SrsConnection(); @@ -125,6 +130,10 @@ public: * get the srs id which identify the client. */ virtual int srs_id(); + /** + * set connection to expired. + */ + virtual void expire(); protected: /** * for concrete connection to do the cycle. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index fd9eb5449..46e862ab9 100755 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -660,22 +660,8 @@ int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) srs_error("stream stream_id=%d not found. ret=%d", sid, ret); return srs_http_response_code(w, ret); } - - if (r->is_http_delete()) { - srs_assert(stream); - - SrsSource* source = SrsSource::fetch(stream->vhost->vhost, stream->app, stream->stream); - if (!source) { - ret = ERROR_SOURCE_NOT_FOUND; - srs_warn("source not found for sid=%d", sid); - return srs_http_response_code(w, ret); - } - - source->set_expired(); - srs_warn("disconnent stream=%d successfully. vhost=%s, app=%s, stream=%s.", - sid, stream->vhost->vhost.c_str(), stream->app.c_str(), stream->stream.c_str()); - return srs_http_response_code(w, ret); - } else if (r->is_http_get()) { + + if (r->is_http_get()) { std::stringstream data; if (!stream) { @@ -726,10 +712,15 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) ret = ERROR_RTMP_STREAM_NOT_FOUND; srs_error("stream client_id=%d not found. ret=%d", cid, ret); return srs_http_response_code(w, ret); - } - if (r->is_http_get()) { + if (r->is_http_delete()) { + srs_assert(client); + + client->conn->expire(); + srs_warn("delete client=%d ok", cid); + return srs_http_response_code(w, ret); + } else if (r->is_http_get()) { std::stringstream data; if (!client) { @@ -751,6 +742,8 @@ int SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) } return srs_http_response_json(w, ss.str()); + } else { + return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed); } return ret; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e4651eecd..b4da91406 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -495,7 +495,7 @@ int SrsRtmpConn::stream_service_cycle() // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); - if ((ret = stat->on_client(_srs_context->get_id(), req)) != ERROR_SUCCESS) { + if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) { srs_error("stat client failed. ret=%d", ret); return ret; } @@ -671,6 +671,13 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe while (!disposed) { // collect elapse for pithy print. pprint->elapse(); + + // when source is set to expired, disconnect it. + if (expired) { + ret = ERROR_USER_DISCONNECT; + srs_error("connection expired. ret=%d", ret); + return ret; + } // to use isolate thread to recv, can improve about 33% performance. // @see: https://github.com/simple-rtmp-server/srs/issues/196 @@ -875,9 +882,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) pprint->elapse(); // when source is set to expired, disconnect it. - if (source->expired()) { + if (expired) { ret = ERROR_USER_DISCONNECT; - srs_error("source is expired. ret=%d", ret); + srs_error("connection expired. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index f60f28887..ec00978cc 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -898,7 +898,6 @@ SrsSource::SrsSource() jitter_algorithm = SrsRtmpJitterAlgorithmOFF; mix_correct = false; mix_queue = new SrsMixQueue(); - is_expired = false; #ifdef SRS_AUTO_HLS hls = new SrsHls(); @@ -2066,7 +2065,7 @@ int SrsSource::on_publish() return ret; } SrsStatistic* stat = SrsStatistic::instance(); - stat->on_stream_publish(_req); + stat->on_stream_publish(_req, _source_id); return ret; } @@ -2101,7 +2100,6 @@ void SrsSource::on_unpublish() _can_publish = true; _source_id = -1; - is_expired = false; // notify the handler. srs_assert(handler); @@ -2260,13 +2258,3 @@ void SrsSource::destroy_forwarders() forwarders.clear(); } -bool SrsSource::expired() -{ - return is_expired; -} - -void SrsSource::set_expired() -{ - is_expired = true; -} - diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 18cdab86c..7c626b7eb 100755 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -453,8 +453,6 @@ private: // whether use interlaced/mixed algorithm to correct timestamp. bool mix_correct; SrsMixQueue* mix_queue; - // the flag of source expired or not. - bool is_expired; // whether stream is monotonically increase. bool is_monotonically_increase; int64_t last_packet_time; @@ -586,12 +584,6 @@ public: private: virtual int create_forwarders(); virtual void destroy_forwarders(); -public: - virtual bool expired(); - /** - * set source expired. - */ - virtual void set_expired(); }; #endif diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 6d7b459bb..2d375b067 100755 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -72,8 +72,8 @@ int SrsStatisticVhost::dumps(stringstream& ss) << SRS_JFIELD_ORG("send_bytes", kbps->get_send_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_bytes", kbps->get_recv_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_OBJ("kbps") - << SRS_JFIELD_ORG("r30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT - << SRS_JFIELD_ORG("s30s", kbps->get_send_kbps_30s()) + << SRS_JFIELD_ORG("recv_30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("send_30s", kbps->get_send_kbps_30s()) << SRS_JOBJECT_END << SRS_JFIELD_CONT << SRS_JFIELD_NAME("hls") << SRS_JOBJECT_START << SRS_JFIELD_BOOL("enabled", hls_enabled); @@ -91,7 +91,8 @@ SrsStatisticStream::SrsStatisticStream() { id = srs_generate_id(); vhost = NULL; - status = STATISTIC_STREAM_STATUS_IDLING; + active = false; + connection_cid = -1; has_video = false; vcodec = SrsCodecVideoReserved; @@ -124,15 +125,18 @@ int SrsStatisticStream::dumps(stringstream& ss) << SRS_JFIELD_STR("name", stream) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("vhost", vhost->id) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", app) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("live_ms", srs_get_system_time_ms()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("clients", nb_clients) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("send_bytes", kbps->get_send_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_bytes", kbps->get_recv_bytes()) << SRS_JFIELD_CONT << SRS_JFIELD_OBJ("kbps") - << SRS_JFIELD_ORG("r30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT - << SRS_JFIELD_ORG("s30s", kbps->get_send_kbps_30s()) + << SRS_JFIELD_ORG("recv_30s", kbps->get_recv_kbps_30s()) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("send_30s", kbps->get_send_kbps_30s()) << SRS_JOBJECT_END << SRS_JFIELD_CONT - << SRS_JFIELD_ORG("live_ms", srs_get_system_time_ms()) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("status", status) << SRS_JFIELD_CONT; + << SRS_JFIELD_OBJ("publish") + << SRS_JFIELD_BOOL("active", active) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("cid", connection_cid) + << SRS_JOBJECT_END << SRS_JFIELD_CONT; if (!has_video) { ss << SRS_JFIELD_NULL("video") << SRS_JFIELD_CONT; @@ -161,21 +165,27 @@ int SrsStatisticStream::dumps(stringstream& ss) return ret; } -void SrsStatisticStream::publish() +void SrsStatisticStream::publish(int cid) { - status = STATISTIC_STREAM_STATUS_PUBLISHING; + connection_cid = cid; + active = true; } void SrsStatisticStream::close() { has_video = false; has_audio = false; - status = STATISTIC_STREAM_STATUS_IDLING; + active = false; } SrsStatisticClient::SrsStatisticClient() { id = 0; + stream = NULL; + conn = NULL; + req = NULL; + type = SrsRtmpConnUnknown; + create = srs_get_system_time_ms(); } SrsStatisticClient::~SrsStatisticClient() @@ -187,7 +197,17 @@ int SrsStatisticClient::dumps(stringstream& ss) int ret = ERROR_SUCCESS; ss << SRS_JOBJECT_START - << SRS_JFIELD_ORG("id", id) + << SRS_JFIELD_ORG("id", id) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("vhost", stream->vhost->id) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("stream", stream->id) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("pageUrl", req->pageUrl) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("swfUrl", req->swfUrl) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("tcUrl", req->tcUrl) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("url", req->get_stream_url()) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("type", srs_client_type_string(type)) << SRS_JFIELD_CONT + << SRS_JFIELD_BOOL("publish", srs_client_type_is_publish(type)) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("alive", srs_get_system_time_ms() - create) << SRS_JOBJECT_END; return ret; @@ -301,12 +321,12 @@ int SrsStatistic::on_audio_info(SrsRequest* req, return ret; } -void SrsStatistic::on_stream_publish(SrsRequest* req) +void SrsStatistic::on_stream_publish(SrsRequest* req, int cid) { SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); - stream->publish(); + stream->publish(cid); } void SrsStatistic::on_stream_close(SrsRequest* req) @@ -317,7 +337,7 @@ void SrsStatistic::on_stream_close(SrsRequest* req) stream->close(); } -int SrsStatistic::on_client(int id, SrsRequest* req) +int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type) { int ret = ERROR_SUCCESS; @@ -336,6 +356,9 @@ int SrsStatistic::on_client(int id, SrsRequest* req) } // got client. + client->conn = conn; + client->req = req; + client->type = type; stream->nb_clients++; vhost->nb_clients++; @@ -458,7 +481,7 @@ int SrsStatistic::dumps_clients(stringstream& ss, int start, int count) ss << SRS_JARRAY_START; std::map::iterator it = clients.begin(); - for (int i = 0; i < count && it != clients.end(); it++) { + for (int i = 0; i < start + count && it != clients.end(); it++, i++) { if (i < start) { continue; } diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 9a8483245..9505ed497 100755 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -34,9 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include - -#define STATISTIC_STREAM_STATUS_PUBLISHING "publishing" -#define STATISTIC_STREAM_STATUS_IDLING "idling" +#include class SrsKbps; class SrsRequest; @@ -68,7 +66,8 @@ public: std::string app; std::string stream; std::string url; - std::string status; + bool active; + int connection_cid; int nb_clients; public: /** @@ -103,7 +102,7 @@ public: /** * publish the stream. */ - virtual void publish(); + virtual void publish(int cid); /** * close the stream. */ @@ -114,7 +113,11 @@ struct SrsStatisticClient { public: SrsStatisticStream* stream; + SrsConnection* conn; + SrsRequest* req; + SrsRtmpConnType type; int id; + int64_t create; public: SrsStatisticClient(); virtual ~SrsStatisticClient(); @@ -169,9 +172,11 @@ public: SrsAacObjectType aac_object ); /** - * when publish stream. - */ - virtual void on_stream_publish(SrsRequest* req); + * when publish stream. + * @param req the request object of publish connection. + * @param cid the cid of publish connection. + */ + virtual void on_stream_publish(SrsRequest* req, int cid); /** * when close stream. */ @@ -181,8 +186,10 @@ public: * when got a client to publish/play stream, * @param id, the client srs id. * @param req, the client request object. + * @param conn, the physical absract connection object. + * @param type, the type of connection. */ - virtual int on_client(int id, SrsRequest* req); + virtual int on_client(int id, SrsRequest* req, SrsConnection* conn, SrsRtmpConnType type); /** * client disconnect * @remark the on_disconnect always call, while the on_client is call when diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 47c68c9b1..bb7fddcef 100755 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -263,6 +263,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_AAC_BYTES_INVALID 4028 #define ERROR_HTTP_REQUEST_EOF 4029 +/////////////////////////////////////////////////////// +// HTTP API error. +/////////////////////////////////////////////////////// +//#define ERROR_API_METHOD_NOT_ALLOWD + /////////////////////////////////////////////////////// // user-define error. /////////////////////////////////////////////////////// diff --git a/trunk/src/protocol/srs_http_stack.cpp b/trunk/src/protocol/srs_http_stack.cpp index 75c55e5e3..8f767279b 100755 --- a/trunk/src/protocol/srs_http_stack.cpp +++ b/trunk/src/protocol/srs_http_stack.cpp @@ -120,8 +120,11 @@ string srs_go_http_detect(char* data, int size) return "application/octet-stream"; // fallback } -// Error replies to the request with the specified error message and HTTP code. -// The error message should be plain text. +int srs_go_http_error(ISrsHttpResponseWriter* w, int code) +{ + return srs_go_http_error(w, code, srs_generate_http_status_text(code)); +} + int srs_go_http_error(ISrsHttpResponseWriter* w, int code, string error) { int ret = ERROR_SUCCESS; @@ -287,7 +290,7 @@ bool SrsHttpNotFoundHandler::is_not_found() int SrsHttpNotFoundHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { - return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound, SRS_CONSTS_HTTP_NotFound_str); + return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound); } SrsHttpFileServer::SrsHttpFileServer(string root_dir) diff --git a/trunk/src/protocol/srs_http_stack.hpp b/trunk/src/protocol/srs_http_stack.hpp index 2888ce186..4e4fed7a2 100644 --- a/trunk/src/protocol/srs_http_stack.hpp +++ b/trunk/src/protocol/srs_http_stack.hpp @@ -76,6 +76,11 @@ class ISrsHttpResponseWriter; #define SRS_CONSTS_HTTP_PUT HTTP_PUT #define SRS_CONSTS_HTTP_DELETE HTTP_DELETE +// Error replies to the request with the specified error message and HTTP code. +// The error message should be plain text. +extern int srs_go_http_error(ISrsHttpResponseWriter* w, int code); +extern int srs_go_http_error(ISrsHttpResponseWriter* w, int code, std::string error); + // helper function: response in json format. extern int srs_http_response_json(ISrsHttpResponseWriter* w, std::string data); /** diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index e7d15876d..277309a62 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -1747,12 +1747,17 @@ string srs_client_type_string(SrsRtmpConnType type) { switch (type) { case SrsRtmpConnPlay: return "Play"; - case SrsRtmpConnFlashPublish: return "publish(FlashPublish)"; - case SrsRtmpConnFMLEPublish: return "publish(FMLEPublish)"; + case SrsRtmpConnFlashPublish: return "flash-publish)"; + case SrsRtmpConnFMLEPublish: return "fmle-publish"; default: return "Unknown"; } } +bool srs_client_type_is_publish(SrsRtmpConnType type) +{ + return type != SrsRtmpConnPlay; +} + SrsHandshakeBytes::SrsHandshakeBytes() { c0c1 = s0s1s2 = c2 = NULL; diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index f2c837fc8..ec50bf78a 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -622,6 +622,7 @@ enum SrsRtmpConnType SrsRtmpConnFlashPublish, }; std::string srs_client_type_string(SrsRtmpConnType type); +bool srs_client_type_is_publish(SrsRtmpConnType type); /** * store the handshake bytes, diff --git a/trunk/src/protocol/srs_rtmp_utility.cpp b/trunk/src/protocol/srs_rtmp_utility.cpp index 4d346ed0c..c28d5cf32 100644 --- a/trunk/src/protocol/srs_rtmp_utility.cpp +++ b/trunk/src/protocol/srs_rtmp_utility.cpp @@ -247,7 +247,7 @@ int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, s // for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 #ifndef _WIN32 // for linux, generally it's 1024. - static int limits = sysconf(_SC_IOV_MAX); + static int limits = (int)sysconf(_SC_IOV_MAX); #else static int limits = 1024; #endif