mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
refine the order.
This commit is contained in:
parent
7b2b11e932
commit
1fd83d9314
30 changed files with 985 additions and 751 deletions
File diff suppressed because it is too large
Load diff
|
@ -31,9 +31,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
|
||||
#include <srs_app_reload.hpp>
|
||||
#include <srs_app_async_call.hpp>
|
||||
#include <srs_app_thread.hpp>
|
||||
|
||||
class SrsRequest;
|
||||
class SrsFileWriter;
|
||||
|
@ -41,10 +44,89 @@ class SrsAmf0Object;
|
|||
class SrsAmf0StrictArray;
|
||||
class SrsAmf0Any;
|
||||
|
||||
class SrsConfig;
|
||||
class SrsRequest;
|
||||
class SrsJsonArray;
|
||||
class SrsConfDirective;
|
||||
|
||||
|
||||
namespace _srs_internal
|
||||
{
|
||||
class SrsConfigBuffer;
|
||||
}
|
||||
/**
|
||||
* the buffer of config content.
|
||||
*/
|
||||
class SrsConfigBuffer
|
||||
{
|
||||
protected:
|
||||
// last available position.
|
||||
char* last;
|
||||
// end of buffer.
|
||||
char* end;
|
||||
// start of buffer.
|
||||
char* start;
|
||||
public:
|
||||
// current consumed position.
|
||||
char* pos;
|
||||
// current parsed line.
|
||||
int line;
|
||||
public:
|
||||
SrsConfigBuffer();
|
||||
virtual ~SrsConfigBuffer();
|
||||
public:
|
||||
/**
|
||||
* fullfill the buffer with content of file specified by filename.
|
||||
*/
|
||||
virtual int fullfill(const char* filename);
|
||||
/**
|
||||
* whether buffer is empty.
|
||||
*/
|
||||
virtual bool empty();
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* deep compare directive.
|
||||
*/
|
||||
extern bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b);
|
||||
extern bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b, std::string except);
|
||||
|
||||
/**
|
||||
* helper utilities, used for compare the consts values.
|
||||
*/
|
||||
extern bool srs_config_hls_is_on_error_ignore(std::string strategy);
|
||||
extern bool srs_config_hls_is_on_error_continue(std::string strategy);
|
||||
extern bool srs_config_ingest_is_file(std::string type);
|
||||
extern bool srs_config_ingest_is_stream(std::string type);
|
||||
extern bool srs_config_dvr_is_plan_segment(std::string plan);
|
||||
extern bool srs_config_dvr_is_plan_session(std::string plan);
|
||||
extern bool srs_config_dvr_is_plan_append(std::string plan);
|
||||
extern bool srs_stream_caster_is_udp(std::string caster);
|
||||
extern bool srs_stream_caster_is_rtsp(std::string caster);
|
||||
extern bool srs_stream_caster_is_flv(std::string caster);
|
||||
// whether the dvr_apply active the stream specified by req.
|
||||
extern bool srs_config_apply_filter(SrsConfDirective* dvr_apply, SrsRequest* req);
|
||||
|
||||
/**
|
||||
* convert bool in str to on/off
|
||||
*/
|
||||
extern std::string srs_config_bool2switch(const std::string& sbool);
|
||||
|
||||
/**
|
||||
* parse loaded vhost directives to compatible mode.
|
||||
* for exmaple, SRS1/2 use the follow refer style:
|
||||
* refer a.domain.com b.domain.com;
|
||||
* while SRS3 use the following:
|
||||
* refer {
|
||||
* enabled on;
|
||||
* all a.domain.com b.domain.com;
|
||||
* }
|
||||
* so we must transform the vhost directive anytime load the config.
|
||||
* @param root the root directive to transform, in and out parameter.
|
||||
*/
|
||||
extern int srs_config_transform_vhost(SrsConfDirective* root);
|
||||
|
||||
// global config
|
||||
extern SrsConfig* _srs_config;
|
||||
|
||||
/**
|
||||
* the config directive.
|
||||
|
@ -103,6 +185,13 @@ public:
|
|||
public:
|
||||
SrsConfDirective();
|
||||
virtual ~SrsConfDirective();
|
||||
public:
|
||||
/**
|
||||
* deep copy the directive, for SrsConfig to use it to support reload in upyun cluster,
|
||||
* for when reload the upyun dynamic config, the root will be changed,
|
||||
* so need to copy it to an old root directive, and use the copy result to do reload.
|
||||
*/
|
||||
virtual SrsConfDirective* copy();
|
||||
// args
|
||||
public:
|
||||
/**
|
||||
|
@ -189,10 +278,6 @@ private:
|
|||
* 3. if ret flag indicates there are child-directives, read_conf(directive, block) recursively.
|
||||
*/
|
||||
virtual int parse_conf(_srs_internal::SrsConfigBuffer* buffer, SrsDirectiveType type);
|
||||
/**
|
||||
* deep copy the directive.
|
||||
*/
|
||||
virtual SrsConfDirective* copy();
|
||||
/**
|
||||
* read a token from buffer.
|
||||
* a token, is the directive args and a flag indicates whether has child-directives.
|
||||
|
@ -1291,83 +1376,5 @@ public:
|
|||
virtual SrsConfDirective* get_stats_disk_device();
|
||||
};
|
||||
|
||||
namespace _srs_internal
|
||||
{
|
||||
/**
|
||||
* the buffer of config content.
|
||||
*/
|
||||
class SrsConfigBuffer
|
||||
{
|
||||
protected:
|
||||
// last available position.
|
||||
char* last;
|
||||
// end of buffer.
|
||||
char* end;
|
||||
// start of buffer.
|
||||
char* start;
|
||||
public:
|
||||
// current consumed position.
|
||||
char* pos;
|
||||
// current parsed line.
|
||||
int line;
|
||||
public:
|
||||
SrsConfigBuffer();
|
||||
virtual ~SrsConfigBuffer();
|
||||
public:
|
||||
/**
|
||||
* fullfill the buffer with content of file specified by filename.
|
||||
*/
|
||||
virtual int fullfill(const char* filename);
|
||||
/**
|
||||
* whether buffer is empty.
|
||||
*/
|
||||
virtual bool empty();
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* deep compare directive.
|
||||
*/
|
||||
extern bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b);
|
||||
extern bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b, std::string except);
|
||||
|
||||
/**
|
||||
* helper utilities, used for compare the consts values.
|
||||
*/
|
||||
extern bool srs_config_hls_is_on_error_ignore(std::string strategy);
|
||||
extern bool srs_config_hls_is_on_error_continue(std::string strategy);
|
||||
extern bool srs_config_ingest_is_file(std::string type);
|
||||
extern bool srs_config_ingest_is_stream(std::string type);
|
||||
extern bool srs_config_dvr_is_plan_segment(std::string plan);
|
||||
extern bool srs_config_dvr_is_plan_session(std::string plan);
|
||||
extern bool srs_config_dvr_is_plan_append(std::string plan);
|
||||
extern bool srs_stream_caster_is_udp(std::string caster);
|
||||
extern bool srs_stream_caster_is_rtsp(std::string caster);
|
||||
extern bool srs_stream_caster_is_flv(std::string caster);
|
||||
// whether the dvr_apply active the stream specified by req.
|
||||
extern bool srs_config_apply_filter(SrsConfDirective* dvr_apply, SrsRequest* req);
|
||||
|
||||
/**
|
||||
* convert bool in str to on/off
|
||||
*/
|
||||
extern std::string srs_config_bool2switch(const std::string& sbool);
|
||||
|
||||
/**
|
||||
* parse loaded vhost directives to compatible mode.
|
||||
* for exmaple, SRS1/2 use the follow refer style:
|
||||
* refer a.domain.com b.domain.com;
|
||||
* while SRS3 use the following:
|
||||
* refer {
|
||||
* enabled on;
|
||||
* all a.domain.com b.domain.com;
|
||||
* }
|
||||
* so we must transform the vhost directive anytime load the config.
|
||||
* @param root the root directive to transform, in and out parameter.
|
||||
*/
|
||||
extern int srs_config_transform_vhost(SrsConfDirective* root);
|
||||
|
||||
// global config
|
||||
extern SrsConfig* _srs_config;
|
||||
|
||||
#endif
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
#include <srs_kernel_log.hpp>
|
||||
#include <srs_kernel_error.hpp>
|
||||
#include <srs_app_utility.hpp>
|
||||
#include <srs_kernel_utility.hpp>
|
||||
|
||||
IConnectionManager::IConnectionManager()
|
||||
{
|
||||
|
@ -42,7 +43,12 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
|
|||
stfd = c;
|
||||
disposed = false;
|
||||
expired = false;
|
||||
|
||||
create_time = srs_get_system_time_ms();
|
||||
|
||||
skt = new SrsStSocket(c);
|
||||
kbps = new SrsKbps();
|
||||
kbps->set_io(skt, skt);
|
||||
|
||||
// the client thread should reap itself,
|
||||
// so we never use joinable.
|
||||
// TODO: FIXME: maybe other thread need to stop it.
|
||||
|
@ -53,10 +59,32 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
|
|||
SrsConnection::~SrsConnection()
|
||||
{
|
||||
dispose();
|
||||
|
||||
|
||||
srs_freep(kbps);
|
||||
srs_freep(skt);
|
||||
srs_freep(pthread);
|
||||
}
|
||||
|
||||
void SrsConnection::resample()
|
||||
{
|
||||
kbps->resample();
|
||||
}
|
||||
|
||||
int64_t SrsConnection::get_send_bytes_delta()
|
||||
{
|
||||
return kbps->get_send_bytes_delta();
|
||||
}
|
||||
|
||||
int64_t SrsConnection::get_recv_bytes_delta()
|
||||
{
|
||||
return kbps->get_recv_bytes_delta();
|
||||
}
|
||||
|
||||
void SrsConnection::cleanup()
|
||||
{
|
||||
kbps->cleanup();
|
||||
}
|
||||
|
||||
void SrsConnection::dispose()
|
||||
{
|
||||
if (disposed) {
|
||||
|
@ -86,7 +114,7 @@ int SrsConnection::cycle()
|
|||
|
||||
ip = srs_get_peer_ip(st_netfd_fileno(stfd));
|
||||
|
||||
ret = do_cycle();
|
||||
int oret = ret = do_cycle();
|
||||
|
||||
// if socket io error, set to closed.
|
||||
if (srs_is_client_gracefully_close(ret)) {
|
||||
|
@ -100,7 +128,7 @@ int SrsConnection::cycle()
|
|||
|
||||
// client close peer.
|
||||
if (ret == ERROR_SOCKET_CLOSED) {
|
||||
srs_warn("client disconnect peer. ret=%d", ret);
|
||||
srs_warn("client disconnect peer. oret=%d, ret=%d", oret, ret);
|
||||
}
|
||||
|
||||
return ERROR_SUCCESS;
|
||||
|
|
|
@ -93,9 +93,31 @@ protected:
|
|||
* when expired, the connection must never be served and quit ASAP.
|
||||
*/
|
||||
bool expired;
|
||||
/**
|
||||
* the underlayer socket.
|
||||
*/
|
||||
SrsStSocket* skt;
|
||||
/**
|
||||
* connection total kbps.
|
||||
* not only the rtmp or http connection, all type of connection are
|
||||
* need to statistic the kbps of io.
|
||||
* the SrsStatistic will use it indirectly to statistic the bytes delta of current connection.
|
||||
*/
|
||||
SrsKbps* kbps;
|
||||
/**
|
||||
* the create time in milliseconds.
|
||||
* for current connection to log self create time and calculate the living time.
|
||||
*/
|
||||
int64_t create_time;
|
||||
public:
|
||||
SrsConnection(IConnectionManager* cm, st_netfd_t c);
|
||||
virtual ~SrsConnection();
|
||||
// interface IKbpsDelta
|
||||
public:
|
||||
virtual void resample();
|
||||
virtual int64_t get_send_bytes_delta();
|
||||
virtual int64_t get_recv_bytes_delta();
|
||||
virtual void cleanup();
|
||||
public:
|
||||
/**
|
||||
* to dipose the connection.
|
||||
|
|
|
@ -70,6 +70,7 @@ SrsEdgeIngester::SrsEdgeIngester()
|
|||
origin_index = 0;
|
||||
stream_id = 0;
|
||||
stfd = NULL;
|
||||
curr_origin_server = "";
|
||||
pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
|
||||
}
|
||||
|
||||
|
@ -118,6 +119,11 @@ void SrsEdgeIngester::stop()
|
|||
_source->on_unpublish();
|
||||
}
|
||||
|
||||
string SrsEdgeIngester::get_curr_origin()
|
||||
{
|
||||
return curr_origin_server;
|
||||
}
|
||||
|
||||
int SrsEdgeIngester::cycle()
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
@ -205,7 +211,7 @@ int SrsEdgeIngester::ingest()
|
|||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -352,9 +358,9 @@ int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port)
|
|||
}
|
||||
|
||||
// select the origin.
|
||||
std::string server = conf->args.at(origin_index % conf->args.size());
|
||||
std::string server = curr_origin_server = conf->args.at(origin_index % conf->args.size());
|
||||
origin_index = (origin_index + 1) % conf->args.size();
|
||||
|
||||
|
||||
std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
|
||||
int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT);
|
||||
size_t pos = server.find(":");
|
||||
|
@ -754,6 +760,11 @@ void SrsPlayEdge::on_all_client_stop()
|
|||
}
|
||||
}
|
||||
|
||||
string SrsPlayEdge::get_curr_origin()
|
||||
{
|
||||
return ingester->get_curr_origin();
|
||||
}
|
||||
|
||||
int SrsPlayEdge::on_ingest_play()
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
|
|
@ -89,6 +89,8 @@ private:
|
|||
SrsKbps* kbps;
|
||||
SrsRtmpClient* client;
|
||||
int origin_index;
|
||||
// current origin server of current source.
|
||||
std::string curr_origin_server;
|
||||
public:
|
||||
SrsEdgeIngester();
|
||||
virtual ~SrsEdgeIngester();
|
||||
|
@ -96,6 +98,7 @@ public:
|
|||
virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req);
|
||||
virtual int start();
|
||||
virtual void stop();
|
||||
virtual std::string get_curr_origin();
|
||||
// interface ISrsReusableThread2Handler
|
||||
public:
|
||||
virtual int cycle();
|
||||
|
@ -182,6 +185,7 @@ public:
|
|||
* when all client stopped play, disconnect to origin.
|
||||
*/
|
||||
virtual void on_all_client_stop();
|
||||
virtual std::string get_curr_origin();
|
||||
public:
|
||||
/**
|
||||
* when ingester start to play stream.
|
||||
|
|
|
@ -387,13 +387,13 @@ int SrsHlsMuxer::deviation()
|
|||
int SrsHlsMuxer::initialize(ISrsHlsHandler* h)
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
||||
|
||||
handler = h;
|
||||
|
||||
if ((ret = async->start()) != ERROR_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -724,7 +724,7 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
// when too large, it maybe timestamp corrupt.
|
||||
if (current->duration * 1000 >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS && (int)current->duration <= max_td) {
|
||||
segments.push_back(current);
|
||||
|
||||
|
||||
// use async to call the http hooks, for it will cause thread switch.
|
||||
if ((ret = async->execute(new SrsDvrAsyncCallOnHls(
|
||||
_srs_context->get_id(), req,
|
||||
|
@ -733,12 +733,12 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
{
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// use async to call the http hooks, for it will cause thread switch.
|
||||
if ((ret = async->execute(new SrsDvrAsyncCallOnHlsNotify(_srs_context->get_id(), req, current->uri))) != ERROR_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64,
|
||||
log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration,
|
||||
current->segment_start_dts);
|
||||
|
@ -749,12 +749,12 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
srs_error("notify handler for update ts failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// close the muxer of finished segment.
|
||||
srs_freep(current->muxer);
|
||||
std::string full_path = current->full_path;
|
||||
current = NULL;
|
||||
|
||||
|
||||
// rename from tmp to real path
|
||||
std::string tmp_file = full_path + ".tmp";
|
||||
if (should_write_file && rename(tmp_file.c_str(), full_path.c_str()) < 0) {
|
||||
|
@ -766,11 +766,11 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
} else {
|
||||
// reuse current segment index.
|
||||
_sequence_no--;
|
||||
|
||||
|
||||
srs_trace("%s drop ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64"",
|
||||
log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration,
|
||||
current->segment_start_dts);
|
||||
|
||||
|
||||
// rename from tmp to real path
|
||||
std::string tmp_file = current->full_path + ".tmp";
|
||||
if (should_write_file) {
|
||||
|
@ -778,13 +778,13 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
srs_warn("ignore unlink path failed, file=%s.", tmp_file.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
srs_freep(current);
|
||||
}
|
||||
|
||||
|
||||
// the segments to remove
|
||||
std::vector<SrsHlsSegment*> segment_to_remove;
|
||||
|
||||
|
||||
// shrink the segments.
|
||||
double duration = 0;
|
||||
int remove_index = -1;
|
||||
|
@ -802,7 +802,7 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
segments.erase(segments.begin());
|
||||
segment_to_remove.push_back(segment);
|
||||
}
|
||||
|
||||
|
||||
// refresh the m3u8, donot contains the removed ts
|
||||
ret = refresh_m3u8();
|
||||
|
||||
|
@ -815,24 +815,24 @@ int SrsHlsMuxer::segment_close(string log_desc)
|
|||
srs_warn("cleanup unlink path failed, file=%s.", segment->full_path.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (should_write_cache) {
|
||||
if ((ret = handler->on_remove_ts(req, segment->uri)) != ERROR_SUCCESS) {
|
||||
srs_warn("remove the ts from ram hls failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
srs_freep(segment);
|
||||
}
|
||||
segment_to_remove.clear();
|
||||
|
||||
|
||||
// check ret of refresh m3u8
|
||||
if (ret != ERROR_SUCCESS) {
|
||||
srs_error("refresh m3u8 failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ SrsHttpClient::SrsHttpClient()
|
|||
skt = NULL;
|
||||
parser = NULL;
|
||||
timeout_us = 0;
|
||||
port = 0;
|
||||
}
|
||||
|
||||
SrsHttpClient::~SrsHttpClient()
|
||||
|
@ -56,6 +57,11 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us)
|
|||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
||||
// disconnect first when h:p changed.
|
||||
if ((!host.empty() && host != h) || (port != 0 && port != p)) {
|
||||
disconnect();
|
||||
}
|
||||
|
||||
srs_freep(parser);
|
||||
parser = new SrsHttpParser();
|
||||
|
||||
|
|
|
@ -55,6 +55,9 @@ using namespace std;
|
|||
#include <srs_app_http_static.hpp>
|
||||
#include <srs_app_http_stream.hpp>
|
||||
#include <srs_app_http_api.hpp>
|
||||
#include <srs_protocol_json.hpp>
|
||||
#include <srs_app_http_hooks.hpp>
|
||||
#include <srs_rtmp_amf0.hpp>
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -1204,47 +1207,57 @@ int SrsHttpConn::do_cycle()
|
|||
srs_error("http initialize http parser failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// underlayer socket
|
||||
SrsStSocket skt(stfd);
|
||||
|
||||
|
||||
// set the recv timeout, for some clients never disconnect the connection.
|
||||
// @see https://github.com/simple-rtmp-server/srs/issues/398
|
||||
skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
|
||||
|
||||
skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
|
||||
|
||||
SrsRequest* last_req = NULL;
|
||||
SrsAutoFree(SrsRequest, last_req);
|
||||
|
||||
// process http messages.
|
||||
while (!disposed) {
|
||||
ISrsHttpMessage* req = NULL;
|
||||
|
||||
|
||||
// get a http message
|
||||
if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) {
|
||||
return ret;
|
||||
if ((ret = parser->parse_message(skt, this, &req)) != ERROR_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
// if SUCCESS, always NOT-NULL.
|
||||
srs_assert(req);
|
||||
|
||||
|
||||
// always free it in this scope.
|
||||
SrsAutoFree(ISrsHttpMessage, req);
|
||||
|
||||
|
||||
// get the last request, for report the info of request on connection disconnect.
|
||||
delete last_req;
|
||||
SrsHttpMessage* hreq = dynamic_cast<SrsHttpMessage*>(req);
|
||||
last_req = hreq->to_request(hreq->host());
|
||||
|
||||
// may should discard the body.
|
||||
if ((ret = on_got_http_message(req)) != ERROR_SUCCESS) {
|
||||
return ret;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
// ok, handle http request.
|
||||
SrsHttpResponseWriter writer(&skt);
|
||||
SrsHttpResponseWriter writer(skt);
|
||||
if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {
|
||||
return ret;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
// donot keep alive, disconnect it.
|
||||
// @see https://github.com/simple-rtmp-server/srs/issues/399
|
||||
if (!req->is_keep_alive()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int disc_ret = ERROR_SUCCESS;
|
||||
if ((disc_ret = on_disconnect(last_req)) != ERROR_SUCCESS) {
|
||||
srs_warn("connection on disconnect peer failed, but ignore this error. disc_ret=%d, ret=%d", disc_ret, ret);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1266,6 +1279,13 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
|
|||
return ret;
|
||||
}
|
||||
|
||||
int SrsHttpConn::on_disconnect(SrsRequest* req)
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
// TODO: implements it.s
|
||||
return ret;
|
||||
}
|
||||
|
||||
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m)
|
||||
: SrsHttpConn(cm, fd, m)
|
||||
{
|
||||
|
|
|
@ -406,6 +406,12 @@ protected:
|
|||
virtual int on_got_http_message(ISrsHttpMessage* msg) = 0;
|
||||
private:
|
||||
virtual int process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
|
||||
/**
|
||||
* when the connection disconnect, call this method.
|
||||
* e.g. log msg of connection and report to other system.
|
||||
* @param request: request which is converted by the last http message.
|
||||
*/
|
||||
virtual int on_disconnect(SrsRequest* req);
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -474,13 +474,13 @@ int SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, i
|
|||
srs_error("res code error, ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
if ((res_code->to_integer()) != ERROR_SUCCESS) {
|
||||
ret = ERROR_RESPONSE_CODE;
|
||||
srs_error("res code error, ret=%d, code=%d", ret, code);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ using namespace std;
|
|||
#include <srs_app_pithy_print.hpp>
|
||||
#include <srs_app_source.hpp>
|
||||
#include <srs_app_server.hpp>
|
||||
#include <srs_app_statistic.hpp>
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -494,6 +495,13 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
|
|||
SrsAutoFree(SrsPithyPrint, pprint);
|
||||
|
||||
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
|
||||
|
||||
// update the statistic when source disconveried.
|
||||
SrsStatistic* stat = SrsStatistic::instance();
|
||||
if ((ret = stat->on_client(_srs_context->get_id(), req, NULL, SrsRtmpConnPlay)) != ERROR_SUCCESS) {
|
||||
srs_error("stat client failed. ret=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// the memory writer.
|
||||
SrsStreamWriter writer(w);
|
||||
|
@ -1132,7 +1140,7 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
|
|||
if (ext.empty()) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// find the actually request vhost.
|
||||
SrsConfDirective* vhost = _srs_config->get_vhost(request->host());
|
||||
if (!vhost || !_srs_config->get_vhost_enabled(vhost)) {
|
||||
|
@ -1179,7 +1187,7 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
|
|||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// convert to concreate class.
|
||||
SrsHttpMessage* hreq = dynamic_cast<SrsHttpMessage*>(request);
|
||||
srs_assert(hreq);
|
||||
|
|
24
trunk/src/app/srs_app_rtmp_conn.cpp
Executable file → Normal file
24
trunk/src/app/srs_app_rtmp_conn.cpp
Executable file → Normal file
|
@ -53,6 +53,8 @@ using namespace std;
|
|||
#include <srs_kernel_utility.hpp>
|
||||
#include <srs_app_security.hpp>
|
||||
#include <srs_app_statistic.hpp>
|
||||
#include <srs_rtmp_utility.hpp>
|
||||
#include <srs_protocol_json.hpp>
|
||||
|
||||
// when stream is busy, for example, streaming is already
|
||||
// publishing, when a new client to request to publish,
|
||||
|
@ -89,13 +91,13 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
|
|||
kbps = new SrsKbps();
|
||||
kbps->set_io(skt, skt);
|
||||
wakable = NULL;
|
||||
|
||||
|
||||
mw_sleep = SRS_PERF_MW_SLEEP;
|
||||
mw_enabled = false;
|
||||
realtime = SRS_PERF_MIN_LATENCY_ENABLED;
|
||||
send_min_interval = 0;
|
||||
tcp_nodelay = false;
|
||||
|
||||
|
||||
_srs_config->subscribe(this);
|
||||
}
|
||||
|
||||
|
@ -208,8 +210,11 @@ int SrsRtmpConn::do_cycle()
|
|||
}
|
||||
|
||||
ret = service_cycle();
|
||||
|
||||
http_hooks_on_close();
|
||||
|
||||
int disc_ret = ERROR_SUCCESS;
|
||||
if ((disc_ret = on_disconnect()) != ERROR_SUCCESS) {
|
||||
srs_warn("connection on disconnect peer failed, but ignore this error. disc_ret=%d, ret=%d", disc_ret, ret);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -1308,6 +1313,17 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
|
|||
return ret;
|
||||
}
|
||||
|
||||
int SrsRtmpConn::on_disconnect()
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
||||
http_hooks_on_close();
|
||||
|
||||
// TODO: implements it.
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int SrsRtmpConn::http_hooks_on_connect()
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
|
5
trunk/src/app/srs_app_rtmp_conn.hpp
Executable file → Normal file
5
trunk/src/app/srs_app_rtmp_conn.hpp
Executable file → Normal file
|
@ -134,6 +134,11 @@ private:
|
|||
virtual int check_edge_token_traverse_auth();
|
||||
virtual int connect_server(int origin_index, st_netfd_t* pstsock);
|
||||
virtual int do_token_traverse_auth(SrsRtmpClient* client);
|
||||
/**
|
||||
* when the connection disconnect, call this method.
|
||||
* e.g. log msg of connection and report to other system.
|
||||
*/
|
||||
virtual int on_disconnect();
|
||||
private:
|
||||
virtual int http_hooks_on_connect();
|
||||
virtual void http_hooks_on_close();
|
||||
|
|
5
trunk/src/app/srs_app_source.cpp
Executable file → Normal file
5
trunk/src/app/srs_app_source.cpp
Executable file → Normal file
|
@ -2292,3 +2292,8 @@ void SrsSource::destroy_forwarders()
|
|||
forwarders.clear();
|
||||
}
|
||||
|
||||
string SrsSource::get_curr_origin()
|
||||
{
|
||||
return play_edge->get_curr_origin();
|
||||
}
|
||||
|
||||
|
|
2
trunk/src/app/srs_app_source.hpp
Executable file → Normal file
2
trunk/src/app/srs_app_source.hpp
Executable file → Normal file
|
@ -592,6 +592,8 @@ public:
|
|||
private:
|
||||
virtual int create_forwarders();
|
||||
virtual void destroy_forwarders();
|
||||
public:
|
||||
virtual std::string get_curr_origin();
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <srs_kernel_codec.hpp>
|
||||
#include <srs_rtmp_stack.hpp>
|
||||
|
|
|
@ -435,6 +435,11 @@ void SrsReusableThread::stop()
|
|||
pthread->stop();
|
||||
}
|
||||
|
||||
bool SrsReusableThread::can_loop()
|
||||
{
|
||||
return pthread->can_loop();
|
||||
}
|
||||
|
||||
int SrsReusableThread::cid()
|
||||
{
|
||||
return pthread->cid();
|
||||
|
|
|
@ -340,6 +340,12 @@ public:
|
|||
* @remark user can stop multiple times, ignore if already stopped.
|
||||
*/
|
||||
virtual void stop();
|
||||
/**
|
||||
* whether the thread should loop,
|
||||
* used for handler->cycle() which has a loop method,
|
||||
* to check this method, break if false.
|
||||
*/
|
||||
virtual bool can_loop();
|
||||
public:
|
||||
/**
|
||||
* get the context id. @see: ISrsThreadContext.get_id().
|
||||
|
|
|
@ -1483,3 +1483,17 @@ void srs_api_dump_summaries(SrsAmf0Object* obj)
|
|||
sys->set("conn_srs", SrsAmf0Any::number(nrs->nb_conn_srs));
|
||||
}
|
||||
|
||||
string srs_join_vector_string(vector<string>& vs, string separator)
|
||||
{
|
||||
string str = "";
|
||||
|
||||
for (int i = 0; i < (int)vs.size(); i++) {
|
||||
str += vs.at(i);
|
||||
if (i != (int)vs.size() - 1) {
|
||||
str += separator;
|
||||
}
|
||||
}
|
||||
|
||||
return str;
|
||||
}
|
||||
|
||||
|
|
|
@ -684,5 +684,8 @@ extern bool srs_is_boolean(const std::string& str);
|
|||
// dump summaries for /api/v1/summaries.
|
||||
extern void srs_api_dump_summaries(SrsAmf0Object* obj);
|
||||
|
||||
// join string in vector with indicated separator
|
||||
extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator);
|
||||
|
||||
#endif
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue