1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #742, refine the rtmp conn, use info as data cluster.

This commit is contained in:
winlin 2017-01-23 17:12:04 +08:00
parent 39aee2b318
commit 79def81792
4 changed files with 111 additions and 56 deletions

View file

@ -254,15 +254,13 @@ void SrsQueueRecvThread::on_stop()
SrsPublishRecvThread::SrsPublishRecvThread( SrsPublishRecvThread::SrsPublishRecvThread(
SrsRtmpServer* rtmp_sdk, SrsRtmpServer* rtmp_sdk,
SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRequest* _req, int mr_sock_fd, int timeout_ms,
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge SrsRtmpConn* conn, SrsSource* source
): trd(this, rtmp_sdk, timeout_ms) ): trd(this, rtmp_sdk, timeout_ms)
{ {
rtmp = rtmp_sdk; rtmp = rtmp_sdk;
_conn = conn; _conn = conn;
_source = source; _source = source;
_is_fmle = is_fmle;
_is_edge = is_edge;
recv_error_code = ERROR_SUCCESS; recv_error_code = ERROR_SUCCESS;
_nb_msgs = 0; _nb_msgs = 0;
@ -351,7 +349,7 @@ int SrsPublishRecvThread::consume(SrsCommonMessage* msg)
srs_update_system_time_ms(), msg->header.timestamp, msg->size); srs_update_system_time_ms(), msg->header.timestamp, msg->size);
// the rtmp connection will handle this message // the rtmp connection will handle this message
ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge); ret = _conn->handle_publish_message(_source, msg);
// must always free it, // must always free it,
// the source will copy it if need to use. // the source will copy it if need to use.

View file

@ -181,8 +181,6 @@ private:
SrsRtmpConn* _conn; SrsRtmpConn* _conn;
// the params for conn callback. // the params for conn callback.
SrsSource* _source; SrsSource* _source;
bool _is_fmle;
bool _is_edge;
// the error timeout cond // the error timeout cond
// @see https://github.com/ossrs/srs/issues/244 // @see https://github.com/ossrs/srs/issues/244
st_cond_t error; st_cond_t error;
@ -192,7 +190,7 @@ private:
public: public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk,
SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRequest* _req, int mr_sock_fd, int timeout_ms,
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge); SrsRtmpConn* conn, SrsSource* source);
virtual ~SrsPublishRecvThread(); virtual ~SrsPublishRecvThread();
public: public:
/** /**

View file

@ -283,13 +283,25 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
transport->set_recv_timeout(timeout); transport->set_recv_timeout(timeout);
} }
SrsClientInfo::SrsClientInfo()
{
edge = false;
req = new SrsRequest();
res = new SrsResponse();
type = SrsRtmpConnUnknown;
}
SrsClientInfo::~SrsClientInfo()
{
srs_freep(req);
srs_freep(res);
}
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip)
: SrsConnection(svr, c, cip) : SrsConnection(svr, c, cip)
{ {
server = svr; server = svr;
req = new SrsRequest();
res = new SrsResponse();
skt = new SrsStSocket(c); skt = new SrsStSocket(c);
rtmp = new SrsRtmpServer(skt); rtmp = new SrsRtmpServer(skt);
refer = new SrsRefer(); refer = new SrsRefer();
@ -305,7 +317,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip)
realtime = SRS_PERF_MIN_LATENCY_ENABLED; realtime = SRS_PERF_MIN_LATENCY_ENABLED;
send_min_interval = 0; send_min_interval = 0;
tcp_nodelay = false; tcp_nodelay = false;
client_type = SrsRtmpConnUnknown; info = new SrsClientInfo();
_srs_config->subscribe(this); _srs_config->subscribe(this);
} }
@ -314,8 +326,7 @@ SrsRtmpConn::~SrsRtmpConn()
{ {
_srs_config->unsubscribe(this); _srs_config->unsubscribe(this);
srs_freep(req); srs_freep(info);
srs_freep(res);
srs_freep(rtmp); srs_freep(rtmp);
srs_freep(skt); srs_freep(skt);
srs_freep(refer); srs_freep(refer);
@ -358,6 +369,7 @@ int SrsRtmpConn::do_cycle()
} }
srs_verbose("rtmp handshake success"); srs_verbose("rtmp handshake success");
SrsRequest* req = info->req;
if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) { if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
srs_error("rtmp connect vhost/app failed. ret=%d", ret); srs_error("rtmp connect vhost/app failed. ret=%d", ret);
return ret; return ret;
@ -440,6 +452,8 @@ int SrsRtmpConn::on_reload_vhost_removed(string vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
if (req->vhost != vhost) { if (req->vhost != vhost) {
return ret; return ret;
} }
@ -460,6 +474,8 @@ int SrsRtmpConn::on_reload_vhost_play(string vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
if (req->vhost != vhost) { if (req->vhost != vhost) {
return ret; return ret;
} }
@ -480,6 +496,8 @@ int SrsRtmpConn::on_reload_vhost_tcp_nodelay(string vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
if (req->vhost != vhost) { if (req->vhost != vhost) {
return ret; return ret;
} }
@ -493,6 +511,8 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
if (req->vhost != vhost) { if (req->vhost != vhost) {
return ret; return ret;
} }
@ -510,6 +530,8 @@ int SrsRtmpConn::on_reload_vhost_publish(string vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
if (req->vhost != vhost) { if (req->vhost != vhost) {
return ret; return ret;
} }
@ -553,6 +575,8 @@ int SrsRtmpConn::service_cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
int out_ack_size = _srs_config->get_out_ack_size(req->vhost); int out_ack_size = _srs_config->get_out_ack_size(req->vhost);
if (out_ack_size && (ret = rtmp->set_window_ack_size(out_ack_size)) != ERROR_SUCCESS) { if (out_ack_size && (ret = rtmp->set_window_ack_size(out_ack_size)) != ERROR_SUCCESS) {
srs_error("set output window acknowledgement size failed. ret=%d", ret); srs_error("set output window acknowledgement size failed. ret=%d", ret);
@ -582,9 +606,9 @@ int SrsRtmpConn::service_cycle()
// do token traverse before serve it. // do token traverse before serve it.
// @see https://github.com/ossrs/srs/pull/239 // @see https://github.com/ossrs/srs/pull/239
if (true) { if (true) {
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); info->edge = _srs_config->get_vhost_is_edge(req->vhost);
bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost); bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
if (vhost_is_edge && edge_traverse) { if (info->edge && edge_traverse) {
if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) { if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
srs_warn("token auth failed, ret=%d", ret); srs_warn("token auth failed, ret=%d", ret);
return ret; return ret;
@ -667,8 +691,9 @@ int SrsRtmpConn::stream_service_cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRtmpConnType type; SrsRequest* req = info->req;
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
if ((ret = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
srs_error("identify client failed. ret=%d", ret); srs_error("identify client failed. ret=%d", ret);
} }
@ -676,10 +701,10 @@ int SrsRtmpConn::stream_service_cycle()
} }
req->strip(); req->strip();
srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f",
srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration); srs_client_type_string(info->type).c_str(), req->stream.c_str(), req->duration);
// security check // security check
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) { if ((ret = security->check(info->type, ip, req)) != ERROR_SUCCESS) {
srs_error("security check failed. ret=%d", ret); srs_error("security check failed. ret=%d", ret);
return ret; return ret;
} }
@ -698,25 +723,23 @@ int SrsRtmpConn::stream_service_cycle()
// update the statistic when source disconveried. // update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) { if ((ret = stat->on_client(_srs_context->get_id(), req, this, info->type)) != ERROR_SUCCESS) {
srs_error("stat client failed. ret=%d", ret); srs_error("stat client failed. ret=%d", ret);
return ret; return ret;
} }
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
bool enabled_cache = _srs_config->get_gop_cache(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge,
source->source_id(), source->source_id()); source->source_id(), source->source_id());
source->set_cache(enabled_cache); source->set_cache(enabled_cache);
client_type = type; switch (info->type) {
switch (type) {
case SrsRtmpConnPlay: { case SrsRtmpConnPlay: {
srs_verbose("start to play stream %s.", req->stream.c_str()); srs_verbose("start to play stream %s.", req->stream.c_str());
// response connection start play // response connection start play
if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { if ((ret = rtmp->start_play(info->res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to play stream failed. ret=%d", ret); srs_error("start to play stream failed. ret=%d", ret);
return ret; return ret;
} }
@ -734,7 +757,7 @@ int SrsRtmpConn::stream_service_cycle()
case SrsRtmpConnFMLEPublish: { case SrsRtmpConnFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) { if ((ret = rtmp->start_fmle_publish(info->res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret); srs_error("start to publish stream failed. ret=%d", ret);
return ret; return ret;
} }
@ -744,7 +767,7 @@ int SrsRtmpConn::stream_service_cycle()
case SrsRtmpConnFlashPublish: { case SrsRtmpConnFlashPublish: {
srs_verbose("flash start to publish stream %s.", req->stream.c_str()); srs_verbose("flash start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) { if ((ret = rtmp->start_flash_publish(info->res->stream_id)) != ERROR_SUCCESS) {
srs_error("flash start to publish stream failed. ret=%d", ret); srs_error("flash start to publish stream failed. ret=%d", ret);
return ret; return ret;
} }
@ -765,6 +788,7 @@ int SrsRtmpConn::check_vhost(bool try_default_vhost)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
srs_assert(req != NULL); srs_assert(req != NULL);
SrsConfDirective* vhost = _srs_config->get_vhost(req->vhost, try_default_vhost); SrsConfDirective* vhost = _srs_config->get_vhost(req->vhost, try_default_vhost);
@ -845,6 +869,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
srs_assert(consumer != NULL); srs_assert(consumer != NULL);
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) { if (_srs_config->get_refer_enabled(req->vhost)) {
if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) { if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) {
srs_error("check play_refer failed. ret=%d", ret); srs_error("check play_refer failed. ret=%d", ret);
@ -991,7 +1016,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// sendout messages, all messages are freed by send_and_free_messages(). // sendout messages, all messages are freed by send_and_free_messages().
// no need to assert msg, for the rtmp will assert it. // no need to assert msg, for the rtmp will assert it.
if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) { if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
srs_error("send messages to client failed. ret=%d", ret); srs_error("send messages to client failed. ret=%d", ret);
} }
@ -1021,6 +1046,8 @@ int SrsRtmpConn::publishing(SrsSource* source)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) { if (_srs_config->get_refer_enabled(req->vhost)) {
if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
srs_error("check publish_refer failed. ret=%d", ret); srs_error("check publish_refer failed. ret=%d", ret);
@ -1034,14 +1061,10 @@ int SrsRtmpConn::publishing(SrsSource* source)
return ret; return ret;
} }
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); if ((ret = acquire_publish(source)) == ERROR_SUCCESS) {
if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
// use isolate thread to recv, // use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237 // @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread trd(rtmp, req, SrsPublishRecvThread trd(rtmp, req, st_netfd_fileno(stfd), 0, this, source);
st_netfd_fileno(stfd), 0, this, source,
client_type == SrsRtmpConnFMLEPublish,
vhost_is_edge);
srs_info("start to publish stream %s success", req->stream.c_str()); srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &trd); ret = do_publishing(source, &trd);
@ -1056,7 +1079,7 @@ int SrsRtmpConn::publishing(SrsSource* source)
// @see https://github.com/ossrs/srs/issues/474 // @see https://github.com/ossrs/srs/issues/474
// @remark when stream is busy, should never release it. // @remark when stream is busy, should never release it.
if (ret != ERROR_SYSTEM_STREAM_BUSY) { if (ret != ERROR_SYSTEM_STREAM_BUSY) {
release_publish(source, vhost_is_edge); release_publish(source);
} }
http_hooks_on_unpublish(); http_hooks_on_unpublish();
@ -1068,6 +1091,7 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint); SrsAutoFree(SrsPithyPrint, pprint);
@ -1150,11 +1174,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
return ret; return ret;
} }
int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) int SrsRtmpConn::acquire_publish(SrsSource* source)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (!source->can_publish(is_edge)) { SrsRequest* req = info->req;
if (!source->can_publish(info->edge)) {
ret = ERROR_SYSTEM_STREAM_BUSY; ret = ERROR_SYSTEM_STREAM_BUSY;
srs_warn("stream %s is already publishing. ret=%d", srs_warn("stream %s is already publishing. ret=%d",
req->get_stream_url().c_str(), ret); req->get_stream_url().c_str(), ret);
@ -1162,7 +1188,7 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
} }
// when edge, ignore the publish event, directly proxy it. // when edge, ignore the publish event, directly proxy it.
if (is_edge) { if (info->edge) {
if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
srs_error("notice edge start publish stream failed. ret=%d", ret); srs_error("notice edge start publish stream failed. ret=%d", ret);
return ret; return ret;
@ -1177,18 +1203,18 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
return ret; return ret;
} }
void SrsRtmpConn::release_publish(SrsSource* source, bool is_edge) void SrsRtmpConn::release_publish(SrsSource* source)
{ {
// when edge, notice edge to change state. // when edge, notice edge to change state.
// when origin, notice all service to unpublish. // when origin, notice all service to unpublish.
if (is_edge) { if (info->edge) {
source->on_edge_proxy_unpublish(); source->on_edge_proxy_unpublish();
} else { } else {
source->on_unpublish(); source->on_unpublish();
} }
} }
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge) int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -1202,7 +1228,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg
SrsAutoFree(SrsPacket, pkt); SrsAutoFree(SrsPacket, pkt);
// for flash, any packet is republish. // for flash, any packet is republish.
if (!is_fmle) { if (info->type == SrsRtmpConnFlashPublish) {
// flash unpublish. // flash unpublish.
// TODO: maybe need to support republish. // TODO: maybe need to support republish.
srs_trace("flash flash publish finished."); srs_trace("flash flash publish finished.");
@ -1212,7 +1238,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg
// for fmle, drop others except the fmle start packet. // for fmle, drop others except the fmle start packet.
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) { if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt); SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { if ((ret = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {
return ret; return ret;
} }
return ERROR_CONTROL_REPUBLISH; return ERROR_CONTROL_REPUBLISH;
@ -1223,7 +1249,7 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg
} }
// video, audio, data message // video, audio, data message
if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) {
srs_error("fmle process publish message failed. ret=%d", ret); srs_error("fmle process publish message failed. ret=%d", ret);
return ret; return ret;
} }
@ -1231,12 +1257,12 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg
return ret; return ret;
} }
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge) int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// for edge, directly proxy message to origin. // for edge, directly proxy message to origin.
if (vhost_is_edge) { if (info->edge) {
if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) { if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
srs_error("edge publish proxy msg failed. ret=%d", ret); srs_error("edge publish proxy msg failed. ret=%d", ret);
return ret; return ret;
@ -1354,7 +1380,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag
// pause // pause
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt); SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
if (pause) { if (pause) {
if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) { if ((ret = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
srs_error("rtmp process play client pause failed. ret=%d", ret); srs_error("rtmp process play client pause failed. ret=%d", ret);
return ret; return ret;
} }
@ -1425,6 +1451,8 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
void SrsRtmpConn::set_sock_options() void SrsRtmpConn::set_sock_options()
{ {
SrsRequest* req = info->req;
bool nvalue = _srs_config->get_tcp_nodelay(req->vhost); bool nvalue = _srs_config->get_tcp_nodelay(req->vhost);
if (nvalue != tcp_nodelay) { if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue; tcp_nodelay = nvalue;
@ -1454,6 +1482,7 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
srs_assert(req); srs_assert(req);
vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args; vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args;
@ -1492,6 +1521,7 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
srs_assert(client); srs_assert(client);
client->set_recv_timeout(SRS_CONSTS_RTMP_TMMS); client->set_recv_timeout(SRS_CONSTS_RTMP_TMMS);
@ -1535,6 +1565,8 @@ int SrsRtmpConn::http_hooks_on_connect()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsRequest* req = info->req;
#ifdef SRS_AUTO_HTTP_CALLBACK #ifdef SRS_AUTO_HTTP_CALLBACK
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return ret; return ret;
@ -1571,6 +1603,8 @@ int SrsRtmpConn::http_hooks_on_connect()
void SrsRtmpConn::http_hooks_on_close() void SrsRtmpConn::http_hooks_on_close()
{ {
#ifdef SRS_AUTO_HTTP_CALLBACK #ifdef SRS_AUTO_HTTP_CALLBACK
SrsRequest* req = info->req;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return; return;
} }
@ -1603,6 +1637,8 @@ int SrsRtmpConn::http_hooks_on_publish()
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK #ifdef SRS_AUTO_HTTP_CALLBACK
SrsRequest* req = info->req;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return ret; return ret;
} }
@ -1638,6 +1674,8 @@ int SrsRtmpConn::http_hooks_on_publish()
void SrsRtmpConn::http_hooks_on_unpublish() void SrsRtmpConn::http_hooks_on_unpublish()
{ {
#ifdef SRS_AUTO_HTTP_CALLBACK #ifdef SRS_AUTO_HTTP_CALLBACK
SrsRequest* req = info->req;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return; return;
} }
@ -1670,6 +1708,8 @@ int SrsRtmpConn::http_hooks_on_play()
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_CALLBACK #ifdef SRS_AUTO_HTTP_CALLBACK
SrsRequest* req = info->req;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return ret; return ret;
} }
@ -1705,6 +1745,8 @@ int SrsRtmpConn::http_hooks_on_play()
void SrsRtmpConn::http_hooks_on_stop() void SrsRtmpConn::http_hooks_on_stop()
{ {
#ifdef SRS_AUTO_HTTP_CALLBACK #ifdef SRS_AUTO_HTTP_CALLBACK
SrsRequest* req = info->req;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return; return;
} }

View file

@ -113,6 +113,25 @@ public:
virtual void set_recv_timeout(int64_t timeout); virtual void set_recv_timeout(int64_t timeout);
}; };
/**
* Some information of client.
*/
class SrsClientInfo
{
public:
// The type of client, play or publish.
SrsRtmpConnType type;
// Whether the client connected at the edge server.
bool edge;
// Original request object from client.
SrsRequest* req;
// Response object to client.
SrsResponse* res;
public:
SrsClientInfo();
virtual ~SrsClientInfo();
};
/** /**
* the client provides the main logic control for RTMP clients. * the client provides the main logic control for RTMP clients.
*/ */
@ -122,8 +141,6 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl
friend class SrsPublishRecvThread; friend class SrsPublishRecvThread;
private: private:
SrsServer* server; SrsServer* server;
SrsRequest* req;
SrsResponse* res;
SrsStSocket* skt; SrsStSocket* skt;
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
SrsRefer* refer; SrsRefer* refer;
@ -151,8 +168,8 @@ private:
int publish_normal_timeout; int publish_normal_timeout;
// whether enable the tcp_nodelay. // whether enable the tcp_nodelay.
bool tcp_nodelay; bool tcp_nodelay;
// The type of client, play or publish. // About the rtmp client.
SrsRtmpConnType client_type; SrsClientInfo* info;
public: public:
SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip); SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip);
virtual ~SrsRtmpConn(); virtual ~SrsRtmpConn();
@ -183,10 +200,10 @@ private:
virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd);
virtual int publishing(SrsSource* source); virtual int publishing(SrsSource* source);
virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);
virtual int acquire_publish(SrsSource* source, bool is_edge); virtual int acquire_publish(SrsSource* source);
virtual void release_publish(SrsSource* source, bool is_edge); virtual void release_publish(SrsSource* source);
virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge); virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
virtual void change_mw_sleep(int sleep_ms); virtual void change_mw_sleep(int sleep_ms);
virtual void set_sock_options(); virtual void set_sock_options();