1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Copy 4.0release

This commit is contained in:
winlin 2021-01-18 11:30:47 +08:00
parent 00395588bc
commit 5e3e013c60
183 changed files with 27373 additions and 13949 deletions

View file

@ -81,13 +81,13 @@ SrsSimpleRtmpClient::~SrsSimpleRtmpClient()
srs_error_t SrsSimpleRtmpClient::connect_app()
{
std::vector<std::string> ips = srs_get_local_ips();
std::vector<SrsIPAddress*>& ips = srs_get_local_ips();
assert(_srs_config->get_stats_network() < (int)ips.size());
std::string local_ip = ips[_srs_config->get_stats_network()];
SrsIPAddress* local_ip = ips[_srs_config->get_stats_network()];
bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
return do_connect_app(local_ip, debug_srs_upnode);
return do_connect_app(local_ip->ip, debug_srs_upnode);
}
SrsClientInfo::SrsClientInfo()
@ -104,9 +104,23 @@ SrsClientInfo::~SrsClientInfo()
srs_freep(res);
}
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnection(svr, c, cip)
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int cport)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
server = svr;
stfd = c;
skt = new SrsTcpConnection(c);
manager = svr;
ip = cip;
port = cport;
create_time = srsu2ms(srs_get_system_time());
clk = new SrsWallClock();
kbps = new SrsKbps(clk);
kbps->set_io(skt, skt);
trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id());
rtmp = new SrsRtmpServer(skt);
refer = new SrsRefer();
@ -121,6 +135,9 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnect
send_min_interval = 0;
tcp_nodelay = false;
info = new SrsClientInfo();
publish_1stpkt_timeout = 0;
publish_normal_timeout = 0;
_srs_config->subscribe(this);
}
@ -128,6 +145,17 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnect
SrsRtmpConn::~SrsRtmpConn()
{
_srs_config->unsubscribe(this);
trd->interrupt();
// wakeup the handler which need to notice.
if (wakable) {
wakable->wakeup();
}
srs_freep(trd);
srs_freep(kbps);
srs_freep(clk);
srs_freep(skt);
srs_freep(info);
srs_freep(rtmp);
@ -136,14 +164,9 @@ SrsRtmpConn::~SrsRtmpConn()
srs_freep(security);
}
void SrsRtmpConn::dispose()
std::string SrsRtmpConn::desc()
{
SrsConnection::dispose();
// wakeup the handler which need to notice.
if (wakable) {
wakable->wakeup();
}
return "RtmpConn";
}
// TODO: return detail message when error for client.
@ -151,7 +174,7 @@ srs_error_t SrsRtmpConn::do_cycle()
{
srs_error_t err = srs_success;
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
@ -267,7 +290,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost)
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
skt->set_socket_buffer(mw_sleep);
return err;
}
@ -305,7 +328,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost)
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
skt->set_socket_buffer(mw_sleep);
return err;
}
@ -515,8 +538,8 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
}
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=[%d][%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, ::getpid(), source->source_id());
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str());
source->set_cache(enabled_cache);
switch (info->type) {
@ -654,10 +677,13 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
// Create a consumer of source.
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(this, consumer)) != srs_success) {
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: dumps consumer");
}
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
@ -704,7 +730,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// when mw_sleep changed, resize the socket send buffer.
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
skt->set_socket_buffer(mw_sleep);
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
@ -1125,7 +1151,7 @@ void SrsRtmpConn::set_sock_options()
if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue;
srs_error_t err = set_tcp_nodelay(tcp_nodelay);
srs_error_t err = skt->set_tcp_nodelay(tcp_nodelay);
if (err != srs_success) {
srs_warn("ignore err %s", srs_error_desc(err).c_str());
srs_freep(err);
@ -1399,3 +1425,68 @@ void SrsRtmpConn::http_hooks_on_stop()
return;
}
srs_error_t SrsRtmpConn::start()
{
srs_error_t err = srs_success;
if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
return err;
}
srs_error_t SrsRtmpConn::cycle()
{
srs_error_t err = do_cycle();
// Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.
manager->remove(this);
// success.
if (err == srs_success) {
srs_trace("client finished.");
return err;
}
// It maybe success with message.
if (srs_error_code(err) == ERROR_SUCCESS) {
srs_trace("client finished%s.", srs_error_summary(err).c_str());
srs_freep(err);
return err;
}
// client close peer.
// TODO: FIXME: Only reset the error when client closed it.
if (srs_is_client_gracefully_close(err)) {
srs_warn("client disconnect peer. ret=%d", srs_error_code(err));
} else if (srs_is_server_gracefully_close(err)) {
srs_warn("server disconnect. ret=%d", srs_error_code(err));
} else {
srs_error("serve error %s", srs_error_desc(err).c_str());
}
srs_freep(err);
return srs_success;
}
string SrsRtmpConn::remote_ip()
{
return ip;
}
const SrsContextId& SrsRtmpConn::get_id()
{
return trd->cid();
}
void SrsRtmpConn::expire()
{
trd->interrupt();
}