1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #742, use ms for application clock tbn.

This commit is contained in:
winlin 2017-01-17 12:25:30 +08:00
parent dca9749f37
commit 3fe338d1c5
43 changed files with 437 additions and 435 deletions

View file

@ -28,8 +28,8 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
// the sleep interval for http async callback.
#define SRS_AUTO_ASYNC_CALLBACL_SLEEP_US 300000
// the sleep interval in ms for http async callback.
#define SRS_AUTO_ASYNC_CALLBACL_CIMS 30
ISrsAsyncCallTask::ISrsAsyncCallTask()
{
@ -41,7 +41,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_CIMS);
wait = st_cond_new();
}

View file

@ -190,8 +190,8 @@ int SrsBandwidth::do_bandwidth_check(SrsKbpsLimit* limit)
SrsBandwidthSample publish_sample;
// timeout for a packet.
_rtmp->set_send_timeout(play_sample.duration_ms * 1000 * 2);
_rtmp->set_recv_timeout(publish_sample.duration_ms * 1000 * 2);
_rtmp->set_send_timeout(play_sample.duration_ms * 2);
_rtmp->set_recv_timeout(publish_sample.duration_ms * 2);
// start test.
srs_update_system_time_ms();

View file

@ -183,9 +183,9 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
srs_freep(sdk);
int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(output, cto / 1000, sto / 1000);
int64_t cto = SRS_CONSTS_RTMP_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(output, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);

View file

@ -48,19 +48,16 @@ using namespace std;
#include <srs_app_rtmp_conn.hpp>
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_EDGE_INGESTER_CIMS (3*1000)
// when edge timeout, retry next.
#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(5*1000*1000LL)
#define SRS_EDGE_INGESTER_TMMS (5*1000)
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
// when edge timeout, retry next.
#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(5*1000*1000LL)
#define SRS_EDGE_FORWARDER_CIMS (3*1000)
// when edge error, wait for quit
#define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL)
#define SRS_EDGE_FORWARDER_TMMS (150)
SrsEdgeUpstream::SrsEdgeUpstream()
{
@ -125,9 +122,9 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
}
srs_freep(sdk);
int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
int64_t cto = SRS_EDGE_INGESTER_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
@ -157,9 +154,9 @@ void SrsEdgeRtmpUpstream::close()
srs_freep(sdk);
}
void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t timeout)
void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t tm)
{
sdk->set_recv_timeout(timeout);
sdk->set_recv_timeout(tm);
}
void SrsEdgeRtmpUpstream::kbps_sample(const char* label, int64_t age)
@ -175,7 +172,7 @@ SrsEdgeIngester::SrsEdgeIngester()
upstream = new SrsEdgeRtmpUpstream(redirect);
lb = new SrsLbRoundRobin();
pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_CIMS);
}
SrsEdgeIngester::~SrsEdgeIngester()
@ -274,7 +271,7 @@ int SrsEdgeIngester::ingest()
SrsAutoFree(SrsPithyPrint, pprint);
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS);
while (!pthread->interrupted()) {
pprint->elapse();
@ -409,7 +406,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
sdk = NULL;
lb = new SrsLbRoundRobin();
pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_CIMS);
queue = new SrsMessageQueue();
}
@ -465,9 +462,9 @@ int SrsEdgeForwarder::start()
// open socket.
srs_freep(sdk);
int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
int64_t cto = SRS_EDGE_FORWARDER_TMMS;
int64_t sto = SRS_CONSTS_RTMP_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
@ -496,7 +493,7 @@ int SrsEdgeForwarder::cycle()
{
int ret = ERROR_SUCCESS;
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TMMS);
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
@ -505,7 +502,7 @@ int SrsEdgeForwarder::cycle()
while (!pthread->interrupted()) {
if (send_error_code != ERROR_SUCCESS) {
st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
st_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
continue;
}

View file

@ -90,7 +90,7 @@ public:
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0;
virtual void close() = 0;
public:
virtual void set_recv_timeout(int64_t timeout) = 0;
virtual void set_recv_timeout(int64_t tm) = 0;
virtual void kbps_sample(const char* label, int64_t age) = 0;
};
@ -111,7 +111,7 @@ public:
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual void close();
public:
virtual void set_recv_timeout(int64_t timeout);
virtual void set_recv_timeout(int64_t tm);
virtual void kbps_sample(const char* label, int64_t age);
};

View file

@ -37,14 +37,14 @@ using namespace std;
#ifdef SRS_AUTO_TRANSCODE
// when error, encoder sleep for a while and retry.
#define SRS_RTMP_ENCODER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_RTMP_ENCODER_CIMS (3000)
// for encoder to detect the dead loop
static std::vector<std::string> _transcoded_url;
SrsEncoder::SrsEncoder()
{
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US);
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_CIMS);
pprint = SrsPithyPrint::create_encoder();
}

View file

@ -48,7 +48,7 @@ using namespace std;
#include <srs_app_rtmp_conn.hpp>
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_FORWARDER_CIMS (3000)
SrsForwarder::SrsForwarder(SrsSource* s)
{
@ -58,7 +58,7 @@ SrsForwarder::SrsForwarder(SrsSource* s)
sh_video = sh_audio = NULL;
sdk = NULL;
pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US);
pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_CIMS);
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
}
@ -237,8 +237,8 @@ int SrsForwarder::cycle()
}
srs_freep(sdk);
int64_t cto = SRS_FORWARDER_SLEEP_US;
int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t cto = SRS_FORWARDER_CIMS;
int64_t sto = SRS_CONSTS_RTMP_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
@ -267,7 +267,7 @@ int SrsForwarder::forward()
{
int ret = ERROR_SUCCESS;
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TMMS);
SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
SrsAutoFree(SrsPithyPrint, pprint);

View file

@ -1365,7 +1365,7 @@ int SrsHttpApi::do_cycle()
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
skt.set_recv_timeout(SRS_HTTP_RECV_TMMS);
// initialize the cors, which will proxy to mux.
bool crossdomain_enabled = _srs_config->get_http_api_crossdomain();

View file

@ -43,7 +43,7 @@ SrsHttpClient::SrsHttpClient()
transport = NULL;
kbps = new SrsKbps();
parser = NULL;
timeout_us = 0;
timeout = SRS_CONSTS_NO_TMMS;
port = 0;
}
@ -56,7 +56,7 @@ SrsHttpClient::~SrsHttpClient()
}
// TODO: FIXME: use ms for timeout.
int SrsHttpClient::initialize(string h, int p, int64_t t_us)
int SrsHttpClient::initialize(string h, int p, int64_t tm)
{
int ret = ERROR_SUCCESS;
@ -71,7 +71,7 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us)
// Always disconnect the transport.
host = h;
port = p;
timeout_us = t_us;
timeout = tm;
disconnect();
// ep used for host in header.
@ -196,9 +196,9 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
return ret;
}
void SrsHttpClient::set_recv_timeout(int64_t timeout)
void SrsHttpClient::set_recv_timeout(int64_t tm)
{
transport->set_recv_timeout(timeout);
transport->set_recv_timeout(tm);
}
void SrsHttpClient::kbps_sample(const char* label, int64_t age)
@ -232,17 +232,17 @@ int SrsHttpClient::connect()
return ret;
}
transport = new SrsTcpClient(host, port, timeout_us / 1000);
transport = new SrsTcpClient(host, port, timeout);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
disconnect();
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout_us, ret);
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
// set the recv/send timeout in us.
transport->set_recv_timeout(timeout_us);
transport->set_send_timeout(timeout_us);
// Set the recv/send timeout in ms.
transport->set_recv_timeout(timeout);
transport->set_send_timeout(timeout);
kbps->set_io(transport, transport);

View file

@ -43,7 +43,7 @@ class SrsStSocket;
class SrsKbps;
// the default timeout for http client.
#define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL)
#define SRS_HTTP_CLIENT_TMMS (30*1000)
/**
* The client to GET/POST/PUT/DELETE over HTTP.
@ -64,8 +64,9 @@ private:
std::map<std::string, std::string> headers;
SrsKbps* kbps;
private:
int64_t timeout_us;
// host name or ip.
// The timeout in ms.
int64_t timeout;
// The host name or ip.
std::string host;
int port;
public:
@ -74,9 +75,10 @@ public:
public:
/**
* Initliaze the client, disconnect the transport, renew the HTTP parser.
* @param tm The underlayer TCP transport timeout in ms.
* @remark we will set default values in headers, which can be override by set_header.
*/
virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US);
virtual int initialize(std::string h, int p, int64_t tm = SRS_HTTP_CLIENT_TMMS);
/**
* Set HTTP request header in header[k]=v.
* @return the HTTP client itself.
@ -99,8 +101,9 @@ public:
* @remark user must free the ppmsg if not NULL.
*/
virtual int get(std::string path, std::string req, ISrsHttpMessage** ppmsg);
private:
virtual void set_recv_timeout(int64_t tm);
public:
virtual void set_recv_timeout(int64_t timeout);
virtual void kbps_sample(const char* label, int64_t age);
private:
virtual void disconnect();

View file

@ -1137,7 +1137,7 @@ int SrsHttpConn::do_cycle()
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
skt->set_recv_timeout(SRS_HTTP_RECV_TMMS);
SrsRequest* last_req = NULL;
SrsAutoFree(SrsRequest, last_req);

View file

@ -43,12 +43,12 @@ using namespace std;
#define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS)
#define SRS_HTTP_HEADER_BUFFER 1024
#define SRS_HTTP_READ_BUFFER 4096
#define SRS_HTTP_BODY_BUFFER 32 * 1024
#define SRS_HTTP_HEADER_BUFFER 1024
#define SRS_HTTP_READ_BUFFER 4096
#define SRS_HTTP_BODY_BUFFER (32 * 1024)
// the timeout for hls notify, in us.
#define SRS_HLS_NOTIFY_TIMEOUT_US (int64_t)(10*1000*1000LL)
// the timeout for hls notify, in ms.
#define SRS_HLS_NOTIFY_TMMS (10 * 1000)
SrsHttpHooks::SrsHttpHooks()
{
@ -383,7 +383,7 @@ int SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* req, std::
}
SrsHttpClient http;
if ((ret = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT_US)) != ERROR_SUCCESS) {
if ((ret = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TMMS)) != ERROR_SUCCESS) {
return ret;
}

View file

@ -155,9 +155,9 @@ int SrsBufferCache::cycle()
}
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS);
// directly use sleep, donot use consumer wait.
st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
// ignore when nothing got.
continue;
@ -165,7 +165,7 @@ int SrsBufferCache::cycle()
if (pprint->can_print()) {
srs_trace("-> "SRS_CONSTS_LOG_HTTP_STREAM_CACHE" http: got %d msgs, age=%d, min=%d, mw=%d",
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TMMS);
}
// free the messages.
@ -557,9 +557,9 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
}
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS);
// directly use sleep, donot use consumer wait.
st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS);
// ignore when nothing got.
continue;
@ -567,7 +567,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
if (pprint->can_print()) {
srs_info("-> "SRS_CONSTS_LOG_HTTP_STREAM" http: got %d msgs, age=%d, min=%d, mw=%d",
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TMMS);
}
// sendout all messages.

View file

@ -39,7 +39,7 @@ using namespace std;
// when error, ingester sleep for a while and retry.
// ingest never sleep a long time, for we must start the stream ASAP.
#define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_AUTO_INGESTER_CIMS (3000)
SrsIngesterFFMPEG::SrsIngesterFFMPEG()
{
@ -109,7 +109,7 @@ SrsIngester::SrsIngester()
expired = false;
pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US);
pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS);
pprint = SrsPithyPrint::create_ingester();
}

View file

@ -40,7 +40,7 @@ using namespace std;
#ifdef SRS_AUTO_KAFKA
#define SRS_KAKFA_CYCLE_INTERVAL_MS 3000
#define SRS_KAKFA_CIMS 3000
#define SRS_KAFKA_PRODUCER_TIMEOUT 30000
#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1
@ -366,7 +366,7 @@ SrsKafkaProducer::SrsKafkaProducer()
metadata_expired = st_cond_new();
lock = st_mutex_new();
pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CYCLE_INTERVAL_MS * 1000);
pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CIMS);
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();
@ -585,7 +585,7 @@ int SrsKafkaProducer::request_metadata()
senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
}
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US / 1000);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TMMS);
SrsAutoFree(SrsTcpClient, transport);
SrsKafkaClient* kafka = new SrsKafkaClient(transport);

View file

@ -616,9 +616,9 @@ int SrsMpegtsOverUdp::connect()
return ret;
}
int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(output, cto/1000, sto/1000);
int64_t cto = SRS_CONSTS_RTMP_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(output, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
close();

View file

@ -37,11 +37,11 @@ using namespace std;
#include <srs_protocol_utility.hpp>
// when error, ng-exec sleep for a while and retry.
#define SRS_RTMP_EXEC_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_RTMP_EXEC_CIMS (3000)
SrsNgExec::SrsNgExec()
{
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_EXEC_SLEEP_US);
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_EXEC_CIMS);
pprint = SrsPithyPrint::create_exec();
}

View file

@ -45,9 +45,9 @@ ISrsMessageHandler::~ISrsMessageHandler()
{
}
SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int timeout_ms)
SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm)
{
timeout = timeout_ms;
timeout = tm;
handler = msg_handler;
rtmp = rtmp_sdk;
trd = new SrsReusableThread2("recv", this);
@ -126,7 +126,7 @@ void SrsRecvThread::on_thread_start()
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/ossrs/srs/issues/194
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);
handler->on_thread_start();
}

View file

@ -86,9 +86,12 @@ protected:
SrsReusableThread2* trd;
ISrsMessageHandler* handler;
SrsRtmpServer* rtmp;
// The recv timeout in ms.
int timeout;
public:
SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int timeout_ms);
// Constructor.
// @param tm The receive timeout in ms.
SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm);
virtual ~SrsRecvThread();
public:
virtual int cid();

View file

@ -60,22 +60,22 @@ using namespace std;
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
// sleep a while and close the connection.
#define SRS_STREAM_BUSY_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_STREAM_BUSY_CIMS (3000)
// the timeout to wait encoder to republish
// the timeout in ms to wait encoder to republish
// if timeout, close the connection.
#define SRS_REPUBLISH_SEND_TIMEOUT_US (int64_t)(3*60*1000*1000LL)
#define SRS_REPUBLISH_SEND_TMMS (3 * 60 * 1000)
// if timeout, close the connection.
#define SRS_REPUBLISH_RECV_TIMEOUT_US (int64_t)(3*60*1000*1000LL)
#define SRS_REPUBLISH_RECV_TMMS (3 * 60 * 1000)
// the timeout to wait client data, when client paused
// the timeout in ms to wait client data, when client paused
// if timeout, close the connection.
#define SRS_PAUSED_SEND_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
#define SRS_PAUSED_SEND_TMMS (3 * 60 * 1000)
// if timeout, close the connection.
#define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
#define SRS_PAUSED_RECV_TMMS (3 * 60 * 1000)
// when edge timeout, retry next.
#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
#define SRS_EDGE_TOKEN_TRAVERSE_TMMS (3000)
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, int64_t ctm, int64_t stm)
{
@ -348,8 +348,8 @@ int SrsRtmpConn::do_cycle()
}
#endif
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TMMS);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TMMS);
if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
srs_error("rtmp handshake failed. ret=%d", ret);
@ -634,8 +634,8 @@ int SrsRtmpConn::service_cycle()
// for republish, continue service
if (ret == ERROR_CONTROL_REPUBLISH) {
// set timeout to a larger value, wait for encoder to republish.
rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TMMS);
rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TMMS);
srs_trace("control message(unpublish) accept, retry stream service.");
continue;
@ -647,8 +647,8 @@ int SrsRtmpConn::service_cycle()
// TODO: FIXME: use ping message to anti-death of socket.
// @see: https://github.com/ossrs/srs/issues/39
// set timeout to a larger value, for user paused.
rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
rtmp->set_recv_timeout(SRS_PAUSED_RECV_TMMS);
rtmp->set_send_timeout(SRS_PAUSED_SEND_TMMS);
srs_trace("control message(close) accept, retry stream service.");
continue;
@ -685,8 +685,8 @@ int SrsRtmpConn::stream_service_cycle()
srs_info("security check ok");
// client is identified, set the timeout to service timeout.
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TMMS);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TMMS);
// find a source to serve.
SrsSource* source = NULL;
@ -1465,7 +1465,7 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(hostport, server, port);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US / 1000);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TMMS);
SrsAutoFree(SrsTcpClient, transport);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
@ -1490,8 +1490,8 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
srs_assert(client);
client->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
client->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);
client->set_recv_timeout(SRS_CONSTS_RTMP_TMMS);
client->set_send_timeout(SRS_CONSTS_RTMP_TMMS);
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("handshake with server failed. ret=%d", ret);

View file

@ -669,9 +669,9 @@ int SrsRtspConn::connect()
}
// connect host.
int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
int64_t cto = SRS_CONSTS_RTMP_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
close();

View file

@ -62,7 +62,6 @@ using namespace std;
// update time interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES
// @see SYS_TIME_RESOLUTION_US
#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 1
// update rusage interval:

View file

@ -538,7 +538,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
void SrsConsumer::wait(int nb_msgs, int duration)
{
if (paused) {
st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS);
return;
}

View file

@ -61,11 +61,11 @@ namespace internal
{
}
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t ims, bool joinable)
{
_name = name;
handler = thread_handler;
cycle_interval_us = interval_us;
cims = ims;
tid = NULL;
loop = false;
@ -231,10 +231,11 @@ namespace internal
break;
}
// Should never use no timeout, just ignore it.
// to improve performance, donot sleep when interval is zero.
// @see: https://github.com/ossrs/srs/issues/237
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
if (cims != 0 && cims != SRS_CONSTS_NO_TMMS) {
st_usleep(cims * 1000);
}
}
@ -268,54 +269,60 @@ namespace internal
SrsStSocket::SrsStSocket(st_netfd_t client_stfd)
{
stfd = client_stfd;
send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT;
recv_bytes = send_bytes = 0;
stm = rtm = SRS_CONSTS_NO_TMMS;
rbytes = sbytes = 0;
}
SrsStSocket::~SrsStSocket()
{
}
bool SrsStSocket::is_never_timeout(int64_t timeout_us)
bool SrsStSocket::is_never_timeout(int64_t tm)
{
return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT;
return tm == SRS_CONSTS_NO_TMMS;
}
void SrsStSocket::set_recv_timeout(int64_t timeout_us)
void SrsStSocket::set_recv_timeout(int64_t tm)
{
recv_timeout = timeout_us;
rtm = tm;
}
int64_t SrsStSocket::get_recv_timeout()
{
return recv_timeout;
return rtm;
}
void SrsStSocket::set_send_timeout(int64_t timeout_us)
void SrsStSocket::set_send_timeout(int64_t tm)
{
send_timeout = timeout_us;
stm = tm;
}
int64_t SrsStSocket::get_send_timeout()
{
return send_timeout;
return stm;
}
int64_t SrsStSocket::get_recv_bytes()
{
return recv_bytes;
return rbytes;
}
int64_t SrsStSocket::get_send_bytes()
{
return send_bytes;
return sbytes;
}
int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read = st_read(stfd, buf, size, recv_timeout);
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read(stfd, buf, size, rtm);
}
if (nread) {
*nread = nb_read;
}
@ -336,7 +343,7 @@ int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
return ERROR_SOCKET_READ;
}
recv_bytes += nb_read;
rbytes += nb_read;
return ret;
}
@ -345,7 +352,13 @@ int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read = st_read_fully(stfd, buf, size, recv_timeout);
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read_fully(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully(stfd, buf, size, rtm);
}
if (nread) {
*nread = nb_read;
}
@ -366,7 +379,7 @@ int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
return ERROR_SOCKET_READ_FULLY;
}
recv_bytes += nb_read;
rbytes += nb_read;
return ret;
}
@ -375,7 +388,13 @@ int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write = st_write(stfd, buf, size, send_timeout);
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_write(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write(stfd, buf, size, stm);
}
if (nwrite) {
*nwrite = nb_write;
}
@ -391,7 +410,7 @@ int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
return ERROR_SOCKET_WRITE;
}
send_bytes += nb_write;
sbytes += nb_write;
return ret;
}
@ -400,7 +419,13 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write = st_writev(stfd, iov, iov_size, send_timeout);
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev(stfd, iov, iov_size, stm);
}
if (nwrite) {
*nwrite = nb_write;
}
@ -416,7 +441,7 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
return ERROR_SOCKET_WRITE;
}
send_bytes += nb_write;
sbytes += nb_write;
return ret;
}
@ -443,7 +468,7 @@ int SrsTcpClient::connect()
close();
srs_assert(stfd == NULL);
if ((ret = srs_socket_connect(host, port, timeout * 1000, &stfd)) != ERROR_SUCCESS) {
if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) {
srs_error("connect tcp://%s:%d failed, to=%"PRId64"ms. ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
@ -465,14 +490,14 @@ void SrsTcpClient::close()
srs_close_stfd(stfd);
}
bool SrsTcpClient::is_never_timeout(int64_t timeout_us)
bool SrsTcpClient::is_never_timeout(int64_t tm)
{
return io->is_never_timeout(timeout_us);
return io->is_never_timeout(tm);
}
void SrsTcpClient::set_recv_timeout(int64_t timeout_us)
void SrsTcpClient::set_recv_timeout(int64_t tm)
{
io->set_recv_timeout(timeout_us);
io->set_recv_timeout(tm);
}
int64_t SrsTcpClient::get_recv_timeout()
@ -480,9 +505,9 @@ int64_t SrsTcpClient::get_recv_timeout()
return io->get_recv_timeout();
}
void SrsTcpClient::set_send_timeout(int64_t timeout_us)
void SrsTcpClient::set_send_timeout(int64_t tm)
{
io->set_send_timeout(timeout_us);
io->set_send_timeout(tm);
}
int64_t SrsTcpClient::get_send_timeout()

View file

@ -110,13 +110,14 @@ namespace internal
bool disposed;
private:
ISrsThreadHandler* handler;
int64_t cycle_interval_us;
// The cycle interval in ms.
int64_t cims;
public:
/**
* initialize the thread.
* @param name, human readable name for st debug.
* @param thread_handler, the cycle handler for the thread.
* @param interval_us, the sleep interval when cycle finished.
* @param ims, the sleep interval in ms when cycle finished.
* @param joinable, if joinable, other thread must stop the thread.
* @remark if joinable, thread never quit itself, or memory leak.
* @see: https://github.com/ossrs/srs/issues/78
@ -126,7 +127,7 @@ namespace internal
* TODO: FIXME: maybe all thread must be reap by others threads,
* @see: https://github.com/ossrs/srs/issues/77
*/
SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t ims, bool joinable);
virtual ~SrsThread();
public:
/**
@ -175,19 +176,23 @@ namespace internal
class SrsStSocket : public ISrsProtocolReaderWriter
{
private:
int64_t recv_timeout;
int64_t send_timeout;
int64_t recv_bytes;
int64_t send_bytes;
// The recv/send timeout in ms.
// @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms.
int64_t rtm;
int64_t stm;
// The recv/send data in bytes
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
st_netfd_t stfd;
public:
SrsStSocket(st_netfd_t client_stfd);
virtual ~SrsStSocket();
public:
virtual bool is_never_timeout(int64_t timeout_us);
virtual void set_recv_timeout(int64_t timeout_us);
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
@ -221,6 +226,7 @@ private:
private:
std::string host;
int port;
// The timeout in ms.
int64_t timeout;
public:
/**
@ -244,10 +250,10 @@ public:
virtual void close();
// interface ISrsProtocolReaderWriter
public:
virtual bool is_never_timeout(int64_t timeout_us);
virtual void set_recv_timeout(int64_t timeout_us);
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();

View file

@ -190,10 +190,10 @@ void ISrsReusableThreadHandler::on_thread_stop()
{
}
SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us)
SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t cims)
{
handler = h;
pthread = new internal::SrsThread(n, this, interval_us, true);
pthread = new internal::SrsThread(n, this, cims, true);
}
SrsReusableThread::~SrsReusableThread()
@ -273,10 +273,10 @@ void ISrsReusableThread2Handler::on_thread_stop()
{
}
SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t interval_us)
SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t cims)
{
handler = h;
pthread = new internal::SrsThread(n, this, interval_us, true);
pthread = new internal::SrsThread(n, this, cims, true);
}
SrsReusableThread2::~SrsReusableThread2()

View file

@ -162,7 +162,7 @@ public:
* 2. must manually stop the thread when started it.
* for example:
* class SrsIngester : public ISrsReusableThreadHandler {
* public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); }
* public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS); }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
@ -198,7 +198,7 @@ private:
internal::SrsThread* pthread;
ISrsReusableThreadHandler* handler;
public:
SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0);
SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t cims = 0);
virtual ~SrsReusableThread();
public:
/**
@ -244,7 +244,7 @@ public:
* 2. must manually stop the thread when started it.
* for example:
* class SrsIngester : public ISrsReusableThreadHandler {
* public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); }
* public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS); }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
@ -287,7 +287,7 @@ private:
internal::SrsThread* pthread;
ISrsReusableThread2Handler* handler;
public:
SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t interval_us = 0);
SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t cims = 0);
virtual ~SrsReusableThread2();
public:
/**

View file

@ -53,10 +53,15 @@ using namespace std;
// the longest time to wait for a process to quit.
#define SRS_PROCESS_QUIT_TIMEOUT_MS 1000
int srs_socket_connect(string server, int port, int64_t timeout, st_netfd_t* pstfd)
int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd)
{
int ret = ERROR_SUCCESS;
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_CONSTS_NO_TMMS) {
timeout = (st_utime_t)(tm * 1000);
}
*pstfd = NULL;
st_netfd_t stfd = NULL;
sockaddr_in addr;

View file

@ -44,7 +44,8 @@ class SrsBuffer;
class SrsJsonObject;
// client open socket and connect to server.
extern int srs_socket_connect(std::string server, int port, int64_t timeout, st_netfd_t* pstfd);
// @param tm The timeout in ms.
extern int srs_socket_connect(std::string server, int port, int64_t tm, st_netfd_t* pstfd);
/**
* convert level in string to log level in int.