1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #1657: Refine TCP connections arch

This commit is contained in:
winlin 2020-11-05 11:47:24 +08:00
parent 3038dd473d
commit 4ba66b388b
11 changed files with 630 additions and 50 deletions

View file

@ -104,10 +104,20 @@ SrsClientInfo::~SrsClientInfo()
srs_freep(res);
}
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port)
: SrsTcpConnection(svr, c, cip, port)
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int cport)
{
server = svr;
stfd = c;
skt = new SrsTcpConnection2(c);
manager = svr;
ip = cip;
port = cport;
create_time = srsu2ms(srs_get_system_time());
clk = new SrsWallClock();
kbps = new SrsKbps(clk);
kbps->set_io(skt, skt);
trd = new SrsSTCoroutine("rtmp", this);
rtmp = new SrsRtmpServer(skt);
refer = new SrsRefer();
@ -132,6 +142,17 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port)
SrsRtmpConn::~SrsRtmpConn()
{
_srs_config->unsubscribe(this);
trd->interrupt();
// wakeup the handler which need to notice.
if (wakable) {
wakable->wakeup();
}
srs_freep(trd);
srs_freep(kbps);
srs_freep(clk);
srs_freep(skt);
srs_freep(info);
srs_freep(rtmp);
@ -145,16 +166,6 @@ std::string SrsRtmpConn::desc()
return "RtmpConn";
}
void SrsRtmpConn::dispose()
{
SrsTcpConnection::dispose();
// wakeup the handler which need to notice.
if (wakable) {
wakable->wakeup();
}
}
// TODO: return detail message when error for client.
srs_error_t SrsRtmpConn::do_cycle()
{
@ -276,7 +287,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost)
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
skt->set_socket_buffer(mw_sleep);
return err;
}
@ -314,7 +325,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost)
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
skt->set_socket_buffer(mw_sleep);
return err;
}
@ -716,7 +727,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// when mw_sleep changed, resize the socket send buffer.
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
skt->set_socket_buffer(mw_sleep);
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
@ -1137,7 +1148,7 @@ void SrsRtmpConn::set_sock_options()
if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue;
srs_error_t err = set_tcp_nodelay(tcp_nodelay);
srs_error_t err = skt->set_tcp_nodelay(tcp_nodelay);
if (err != srs_success) {
srs_warn("ignore err %s", srs_error_desc(err).c_str());
srs_freep(err);
@ -1411,3 +1422,67 @@ void SrsRtmpConn::http_hooks_on_stop()
return;
}
srs_error_t SrsRtmpConn::start()
{
srs_error_t err = srs_success;
if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
return err;
}
srs_error_t SrsRtmpConn::cycle()
{
srs_error_t err = do_cycle();
// Notify manager to remove it.
manager->remove(this);
// success.
if (err == srs_success) {
srs_trace("client finished.");
return err;
}
// It maybe success with message.
if (srs_error_code(err) == ERROR_SUCCESS) {
srs_trace("client finished%s.", srs_error_summary(err).c_str());
srs_freep(err);
return err;
}
// client close peer.
// TODO: FIXME: Only reset the error when client closed it.
if (srs_is_client_gracefully_close(err)) {
srs_warn("client disconnect peer. ret=%d", srs_error_code(err));
} else if (srs_is_server_gracefully_close(err)) {
srs_warn("server disconnect. ret=%d", srs_error_code(err));
} else {
srs_error("serve error %s", srs_error_desc(err).c_str());
}
srs_freep(err);
return srs_success;
}
string SrsRtmpConn::remote_ip()
{
return ip;
}
const SrsContextId& SrsRtmpConn::get_id()
{
return trd->cid();
}
void SrsRtmpConn::expire()
{
trd->interrupt();
}