1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-14 12:21:55 +00:00

For #907, Wrap ST, only use in service ST.

This commit is contained in:
winlin 2017-05-30 09:05:02 +08:00
parent 54411e0768
commit 1bf99e8f3e
49 changed files with 340 additions and 513 deletions

View file

@ -186,6 +186,7 @@ Please select your language:
- [ ] Support HLS+, please read [#466][bug #466] and [#468][bug #468].
### Change Logs
<a name="history"></a>
### V3 changes

View file

@ -39,7 +39,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
trd = NULL;
wait = st_cond_new();
wait = srs_cond_new();
}
SrsAsyncCallWorker::~SrsAsyncCallWorker()
@ -53,7 +53,7 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker()
}
tasks.clear();
st_cond_destroy(wait);
srs_cond_destroy(wait);
}
int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
@ -61,7 +61,7 @@ int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t)
int ret = ERROR_SUCCESS;
tasks.push_back(t);
st_cond_signal(wait);
srs_cond_signal(wait);
return ret;
}
@ -80,7 +80,7 @@ int SrsAsyncCallWorker::start()
void SrsAsyncCallWorker::stop()
{
st_cond_signal(wait);
srs_cond_signal(wait);
trd->stop();
}
@ -90,7 +90,7 @@ int SrsAsyncCallWorker::cycle()
while (!trd->pull()) {
if (tasks.empty()) {
st_cond_wait(wait);
srs_cond_wait(wait);
}
std::vector<ISrsAsyncCallTask*> copy = tasks;

View file

@ -69,7 +69,7 @@ private:
SrsCoroutine* trd;
protected:
std::vector<ISrsAsyncCallTask*> tasks;
st_cond_t wait;
srs_cond_t wait;
public:
SrsAsyncCallWorker();
virtual ~SrsAsyncCallWorker();

View file

@ -245,7 +245,7 @@ int SrsBandwidth::do_bandwidth_check(SrsKbpsLimit* limit)
return ret;
}
st_usleep(_SRS_BANDWIDTH_FINAL_WAIT_MS * 1000);
srs_usleep(_SRS_BANDWIDTH_FINAL_WAIT_MS * 1000);
srs_info("BW check finished.");
return ret;
@ -291,7 +291,7 @@ int SrsBandwidth::play_checking(SrsBandwidthSample* sample, SrsKbpsLimit* limit)
srs_update_system_time_ms();
int64_t starttime = srs_get_system_time_ms();
while ((srs_get_system_time_ms() - starttime) < sample->duration_ms) {
st_usleep(sample->interval_ms);
srs_usleep(sample->interval_ms);
// TODO: FIXME: use shared ptr message.
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_playing();
@ -499,7 +499,7 @@ void SrsKbpsLimit::recv_limit()
while (_kbps->get_recv_kbps() > _limit_kbps) {
_kbps->sample();
st_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
srs_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
}
}
@ -510,7 +510,7 @@ void SrsKbpsLimit::send_limit()
while (_kbps->get_send_kbps() > _limit_kbps) {
_kbps->sample();
st_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
srs_usleep(_SRS_BANDWIDTH_LIMIT_INTERVAL_MS * 1000);
}
}

View file

@ -75,11 +75,11 @@ int SrsAppCasterFlv::initialize()
return ret;
}
int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd)
int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
string ip = srs_get_peer_ip(st_netfd_fileno(stfd));
string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip);
conns.push_back(conn);
@ -131,7 +131,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
return conn->proxy(w, r, o);
}
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
{
sdk = NULL;

View file

@ -66,7 +66,7 @@ public:
virtual int initialize();
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
virtual int on_tcp_client(srs_netfd_t stfd);
// IConnectionManager
public:
virtual void remove(ISrsConnection* c);
@ -85,7 +85,7 @@ private:
SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk;
public:
SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip);
SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
virtual ~SrsDynamicHttpConn();
public:
virtual int on_got_http_message(ISrsHttpMessage* msg);

View file

@ -30,7 +30,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip)
SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip)
{
manager = cm;
stfd = c;

View file

@ -55,7 +55,7 @@ protected:
/**
* the underlayer st fd handler.
*/
st_netfd_t stfd;
srs_netfd_t stfd;
/**
* the ip of client.
*/
@ -77,7 +77,7 @@ protected:
*/
int64_t create_time;
public:
SrsConnection(IConnectionManager* cm, st_netfd_t c, std::string cip);
SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip);
virtual ~SrsConnection();
// interface IKbpsDelta
public:

View file

@ -232,7 +232,7 @@ int SrsEdgeIngester::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_EDGE_INGESTER_CIMS * 1000);
srs_usleep(SRS_EDGE_INGESTER_CIMS * 1000);
}
}
return ret;
@ -517,7 +517,7 @@ int SrsEdgeForwarder::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_EDGE_FORWARDER_CIMS * 1000);
srs_usleep(SRS_EDGE_FORWARDER_CIMS * 1000);
}
}
return ret;
@ -538,7 +538,7 @@ int SrsEdgeForwarder::do_cycle()
while (!trd->pull()) {
if (send_error_code != ERROR_SUCCESS) {
st_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
srs_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
continue;
}

View file

@ -102,7 +102,7 @@ int SrsEncoder::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_RTMP_ENCODER_CIMS * 1000);
srs_usleep(SRS_RTMP_ENCODER_CIMS * 1000);
}
}

View file

@ -231,7 +231,7 @@ int SrsForwarder::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_FORWARDER_CIMS * 1000);
srs_usleep(SRS_FORWARDER_CIMS * 1000);
}
}

View file

@ -29,7 +29,7 @@
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
#include <algorithm>
#include <sstream>
using namespace std;

View file

@ -80,7 +80,7 @@ int SrsHourGlass::cycle()
}
total_elapse += resolution;
st_usleep(resolution * 1000);
srs_usleep(resolution * 1000);
return ret;
}

View file

@ -1290,7 +1290,7 @@ int SrsGoApiError::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
return srs_api_response_code(w, r, 100);
}
SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsConnection(cm, fd, cip)
{
mux = m;

View file

@ -208,7 +208,7 @@ private:
SrsHttpCorsMux* cors;
SrsHttpServeMux* mux;
public:
SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip);
SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
virtual ~SrsHttpApi();
// interface IKbpsDelta
public:

View file

@ -59,7 +59,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_app_st.hpp>
SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip)
SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip)
: SrsConnection(cm, fd, cip)
{
parser = new SrsHttpParser();
@ -204,7 +204,7 @@ int SrsHttpConn::on_reload_http_stream_crossdomain()
return ret;
}
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip)
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
{
}

View file

@ -65,7 +65,7 @@ protected:
ISrsHttpServeMux* http_mux;
SrsHttpCorsMux* cors;
public:
SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
virtual ~SrsHttpConn();
// interface IKbpsDelta
public:
@ -99,7 +99,7 @@ public:
class SrsResponseOnlyHttpConn : public SrsHttpConn
{
public:
SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
virtual ~SrsResponseOnlyHttpConn();
public:
// Directly read a HTTP request message.

View file

@ -116,7 +116,7 @@ int SrsBufferCache::cycle()
// TODO: FIXME: support reload.
if (fast_cache <= 0) {
st_sleep(SRS_STREAM_CACHE_CYCLE_SECONDS);
srs_usleep(SRS_STREAM_CACHE_CYCLE_SECONDS * 1000 * 1000);
return ret;
}
@ -152,7 +152,7 @@ int SrsBufferCache::cycle()
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS);
// directly use sleep, donot use consumer wait.
st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
// ignore when nothing got.
continue;
@ -572,7 +572,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS);
// directly use sleep, donot use consumer wait.
st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
// ignore when nothing got.
continue;

View file

@ -183,7 +183,7 @@ int SrsIngester::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_AUTO_INGESTER_CIMS * 1000);
srs_usleep(SRS_AUTO_INGESTER_CIMS * 1000);
}
}

View file

@ -362,9 +362,9 @@ void srs_dispose_kafka()
SrsKafkaProducer::SrsKafkaProducer()
{
metadata_ok = false;
metadata_expired = st_cond_new();
metadata_expired = srs_cond_new();
lock = st_mutex_new();
lock = srs_mutex_new();
trd = NULL;
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();
@ -382,8 +382,8 @@ SrsKafkaProducer::~SrsKafkaProducer()
srs_freep(trd);
srs_freep(cache);
st_mutex_destroy(lock);
st_cond_destroy(metadata_expired);
srs_mutex_destroy(lock);
srs_cond_destroy(metadata_expired);
}
int SrsKafkaProducer::initialize()
@ -448,14 +448,14 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
}
// sync with backgound metadata worker.
st_mutex_lock(lock);
srs_mutex_lock(lock);
// flush message when metadata is ok.
if (metadata_ok) {
ret = flush();
}
st_mutex_unlock(lock);
srs_mutex_unlock(lock);
return ret;
}
@ -503,7 +503,7 @@ int SrsKafkaProducer::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_KAKFA_CIMS * 1000);
srs_usleep(SRS_KAKFA_CIMS * 1000);
}
}
@ -515,18 +515,18 @@ int SrsKafkaProducer::on_before_cycle()
// wait for the metadata expired.
// when metadata is ok, wait for it expired.
if (metadata_ok) {
st_cond_wait(metadata_expired);
srs_cond_wait(metadata_expired);
}
// request to lock to acquire the socket.
st_mutex_lock(lock);
srs_mutex_lock(lock);
return ERROR_SUCCESS;
}
int SrsKafkaProducer::on_end_cycle()
{
st_mutex_unlock(lock);
srs_mutex_unlock(lock);
return ERROR_SUCCESS;
}
@ -644,7 +644,7 @@ void SrsKafkaProducer::refresh_metadata()
clear_metadata();
metadata_ok = false;
st_cond_signal(metadata_expired);
srs_cond_signal(metadata_expired);
srs_trace("kafka async refresh metadata in background");
}

View file

@ -163,11 +163,11 @@ class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISr
private:
// TODO: FIXME: support reload.
bool enabled;
st_mutex_t lock;
srs_mutex_t lock;
SrsCoroutine* trd;
private:
bool metadata_ok;
st_cond_t metadata_expired;
srs_cond_t metadata_expired;
public:
std::vector<SrsKafkaPartition*> partitions;
SrsKafkaCache* cache;

View file

@ -30,6 +30,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
#include <srs_kernel_log.hpp>
@ -54,7 +55,7 @@ ISrsUdpHandler::~ISrsUdpHandler()
{
}
int ISrsUdpHandler::on_stfd_change(st_netfd_t /*fd*/)
int ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
{
return ERROR_SUCCESS;
}
@ -101,7 +102,7 @@ int SrsUdpListener::fd()
return _fd;
}
st_netfd_t SrsUdpListener::stfd()
srs_netfd_t SrsUdpListener::stfd()
{
return _stfd;
}
@ -131,7 +132,7 @@ int SrsUdpListener::listen()
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@ -159,7 +160,7 @@ int SrsUdpListener::cycle()
int nb_from = sizeof(sockaddr_in);
int nread = 0;
if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
srs_warn("ignore recv udp packet failed, nread=%d", nread);
return ret;
}
@ -170,7 +171,7 @@ int SrsUdpListener::cycle()
}
if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) {
st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
srs_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
}
}
@ -233,7 +234,7 @@ int SrsTcpListener::listen()
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@ -256,8 +257,8 @@ int SrsTcpListener::cycle()
int ret = ERROR_SUCCESS;
while (!trd->pull()) {
st_netfd_t stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
int fd = st_netfd_fileno(stfd);
srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
int fd = srs_netfd_fileno(stfd);
srs_fd_close_exec(fd);

View file

@ -46,7 +46,7 @@ public:
* when fd changed, for instance, reload the listen port,
* notify the handler and user can do something.
*/
virtual int on_stfd_change(st_netfd_t fd);
virtual int on_stfd_change(srs_netfd_t fd);
public:
/**
* when udp listener got a udp packet, notice server to process it.
@ -72,7 +72,7 @@ public:
/**
* when got tcp client.
*/
virtual int on_tcp_client(st_netfd_t stfd) = 0;
virtual int on_tcp_client(srs_netfd_t stfd) = 0;
};
/**
@ -82,7 +82,7 @@ class SrsUdpListener : public ISrsCoroutineHandler
{
private:
int _fd;
st_netfd_t _stfd;
srs_netfd_t _stfd;
SrsCoroutine* trd;
private:
char* buf;
@ -96,7 +96,7 @@ public:
virtual ~SrsUdpListener();
public:
virtual int fd();
virtual st_netfd_t stfd();
virtual srs_netfd_t stfd();
public:
virtual int listen();
// interface ISrsReusableThreadHandler.
@ -111,7 +111,7 @@ class SrsTcpListener : public ISrsCoroutineHandler
{
private:
int _fd;
st_netfd_t _stfd;
srs_netfd_t _stfd;
SrsCoroutine* trd;
private:
ISrsTcpHandler* handler;

View file

@ -29,6 +29,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>

View file

@ -88,7 +88,7 @@ int SrsNgExec::cycle()
}
if (!trd->pull()) {
st_usleep(SRS_RTMP_EXEC_CIMS * 1000);
srs_usleep(SRS_RTMP_EXEC_CIMS * 1000);
}
}

View file

@ -34,6 +34,7 @@
#include <srs_app_http_conn.hpp>
#include <srs_core_autofree.hpp>
#include <sys/socket.h>
using namespace std;
// the max small bytes to group
@ -120,7 +121,7 @@ int SrsRecvThread::do_cycle()
while (!trd->pull()) {
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
st_usleep(timeout * 1000);
srs_usleep(timeout * 1000);
continue;
}
@ -265,7 +266,7 @@ SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest*
recv_error_code = ERROR_SUCCESS;
_nb_msgs = 0;
video_frames = 0;
error = st_cond_new();
error = srs_cond_new();
ncid = cid = 0;
req = _req;
@ -286,7 +287,7 @@ SrsPublishRecvThread::~SrsPublishRecvThread()
_srs_config->unsubscribe(this);
trd.stop();
st_cond_destroy(error);
srs_cond_destroy(error);
}
int SrsPublishRecvThread::wait(uint64_t timeout_ms)
@ -296,7 +297,7 @@ int SrsPublishRecvThread::wait(uint64_t timeout_ms)
}
// ignore any return of cond wait.
st_cond_timedwait(error, timeout_ms * 1000);
srs_cond_timedwait(error, timeout_ms * 1000);
return ERROR_SUCCESS;
}
@ -380,7 +381,7 @@ void SrsPublishRecvThread::interrupt(int ret)
// when recv thread error, signal the conn thread to process it.
// @see https://github.com/ossrs/srs/issues/244
st_cond_signal(error);
srs_cond_signal(error);
}
void SrsPublishRecvThread::on_start()
@ -407,7 +408,7 @@ void SrsPublishRecvThread::on_stop()
// when thread stop, signal the conn thread which wait.
// @see https://github.com/ossrs/srs/issues/244
st_cond_signal(error);
srs_cond_signal(error);
#ifdef SRS_PERF_MERGED_READ
if (mr) {
@ -436,7 +437,7 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
* @see https://github.com/ossrs/srs/issues/241
*/
if (nread < SRS_MR_SMALL_BYTES) {
st_usleep(mr_sleep * 1000);
srs_usleep(mr_sleep * 1000);
}
}
#endif

View file

@ -183,7 +183,7 @@ private:
SrsSource* _source;
// the error timeout cond
// @see https://github.com/ossrs/srs/issues/244
st_cond_t error;
srs_cond_t error;
// merged context id.
int cid;
int ncid;

View file

@ -28,7 +28,7 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <unistd.h>
using namespace std;
#include <srs_kernel_error.hpp>
@ -110,7 +110,7 @@ SrsClientInfo::~SrsClientInfo()
srs_freep(res);
}
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip)
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip)
: SrsConnection(svr, c, cip)
{
server = svr;
@ -161,7 +161,7 @@ int SrsRtmpConn::do_cycle()
{
int ret = ERROR_SUCCESS;
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), st_netfd_fileno(stfd));
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
// notify kafka cluster.
#ifdef SRS_AUTO_KAFKA
@ -407,7 +407,7 @@ int SrsRtmpConn::service_cycle()
srs_verbose("set peer bandwidth success");
// get the ip which client connected.
std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd));
std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));
// do bandwidth test if connect to the vhost which is for bandwidth check.
if (_srs_config->get_bw_check_enabled(req->vhost)) {
@ -818,7 +818,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_info("mw sleep %dms for no msg", mw_sleep);
st_usleep(mw_sleep * 1000);
srs_usleep(mw_sleep * 1000);
#else
srs_verbose("mw wait %dms and got nothing.", mw_sleep);
#endif
@ -864,7 +864,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// apply the minimal interval for delivery stream in ms.
if (send_min_interval > 0) {
st_usleep((int64_t)(send_min_interval * 1000));
srs_usleep((int64_t)(send_min_interval * 1000));
}
}
@ -893,7 +893,7 @@ int SrsRtmpConn::publishing(SrsSource* source)
if ((ret = acquire_publish(source)) == ERROR_SUCCESS) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread rtrd(rtmp, req, st_netfd_fileno(stfd), 0, this, source);
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source);
srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &rtrd);
@ -1243,7 +1243,7 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
}
// get the sock buffer size.
int fd = st_netfd_fileno(stfd);
int fd = srs_netfd_fileno(stfd);
int onb_sbuf = 0;
socklen_t sock_buf_size = sizeof(int);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size);
@ -1295,7 +1295,7 @@ void SrsRtmpConn::set_sock_options()
if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue;
#ifdef SRS_PERF_TCP_NODELAY
int fd = st_netfd_fileno(stfd);
int fd = srs_netfd_fileno(stfd);
socklen_t nb_v = sizeof(int);

View file

@ -127,7 +127,7 @@ private:
// About the rtmp client.
SrsClientInfo* info;
public:
SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip);
SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip);
virtual ~SrsRtmpConn();
public:
virtual void dispose();

View file

@ -183,7 +183,7 @@ int SrsRtspJitter::correct(int64_t& ts)
return ret;
}
SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o)
{
output_template = o;
@ -245,7 +245,7 @@ int SrsRtspConn::do_cycle()
int ret = ERROR_SUCCESS;
// retrieve ip of client.
std::string ip = srs_get_peer_ip(st_netfd_fileno(stfd));
std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
srs_trace("rtsp: serve %s", ip.c_str());
// consume all rtsp messages.
@ -746,7 +746,7 @@ void SrsRtspCaster::free_port(int lpmin, int lpmax)
srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax);
}
int SrsRtspCaster::on_tcp_client(st_netfd_t stfd)
int SrsRtspCaster::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;

View file

@ -129,7 +129,7 @@ private:
int audio_channel;
SrsRtpConn* audio_rtp;
private:
st_netfd_t stfd;
srs_netfd_t stfd;
SrsStSocket* skt;
SrsRtspStack* rtsp;
SrsRtspCaster* caster;
@ -149,7 +149,7 @@ private:
std::string aac_specific_config;
SrsRtspAudioCache* acache;
public:
SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o);
SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o);
virtual ~SrsRtspConn();
public:
virtual int serve();
@ -206,7 +206,7 @@ public:
virtual void free_port(int lpmin, int lpmax);
// interface ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
virtual int on_tcp_client(srs_netfd_t stfd);
// internal methods.
public:
virtual void remove(SrsRtspConn* conn);

View file

@ -28,7 +28,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <algorithm>
using namespace std;
@ -163,7 +163,7 @@ int SrsBufferListener::listen(string i, int p)
return ret;
}
int SrsBufferListener::on_tcp_client(st_netfd_t stfd)
int SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@ -219,7 +219,7 @@ int SrsRtspListener::listen(string i, int p)
return ret;
}
int SrsRtspListener::on_tcp_client(st_netfd_t stfd)
int SrsRtspListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@ -279,7 +279,7 @@ int SrsHttpFlvListener::listen(string i, int p)
return ret;
}
int SrsHttpFlvListener::on_tcp_client(st_netfd_t stfd)
int SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@ -391,7 +391,7 @@ int SrsSignalManager::initialize()
return ret;
}
if ((signal_read_stfd = st_netfd_open(sig_pipe[0])) == NULL) {
if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
ret = ERROR_SYSTEM_CREATE_PIPE;
srs_error("create signal manage st pipe failed. ret=%d", ret);
return ret;
@ -444,7 +444,7 @@ int SrsSignalManager::cycle()
int signo;
/* Read the next signal from the pipe */
st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);
srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT);
/* Process signal synchronously */
server->on_signal(signo);
@ -863,7 +863,7 @@ int SrsServer::cycle()
// remark, for gmc, never invoke the exit().
srs_warn("sleep a long time for system st-threads to cleanup.");
st_usleep(3 * 1000 * 1000);
srs_usleep(3 * 1000 * 1000);
srs_warn("system quit");
#else
// normally quit with neccessary cleanup by dispose().
@ -966,7 +966,7 @@ int SrsServer::do_cycle()
int dynamic_max = srs_max(max, heartbeat_max_resolution);
for (int i = 0; i < dynamic_max; i++) {
st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
srs_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
// asprocess check.
if (asprocess && ::getppid() != ppid) {
@ -1235,7 +1235,7 @@ void SrsServer::resample_kbps()
srs_update_rtmp_server((int)conns.size(), kbps);
}
int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd)
int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
@ -1260,11 +1260,11 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd)
return ret;
}
SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd)
SrsConnection* SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
int fd = st_netfd_fileno(stfd);
int fd = srs_netfd_fileno(stfd);
string ip = srs_get_peer_ip(fd);
// for some keep alive application, for example, the keepalived,

View file

@ -35,6 +35,7 @@
#include <srs_app_hls.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_conn.hpp>
#include <srs_service_st.hpp>
class SrsServer;
class SrsConnection;
@ -107,7 +108,7 @@ public:
virtual int listen(std::string ip, int port);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
virtual int on_tcp_client(srs_netfd_t stfd);
};
#ifdef SRS_AUTO_STREAM_CASTER
@ -126,7 +127,7 @@ public:
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
virtual int on_tcp_client(srs_netfd_t stfd);
};
/**
@ -144,7 +145,7 @@ public:
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
virtual int on_tcp_client(srs_netfd_t stfd);
};
#endif
@ -185,7 +186,7 @@ private:
/* Per-process pipe which is used as a signal queue. */
/* Up to PIPE_BUF/sizeof(int) signals can be queued up. */
int sig_pipe[2];
st_netfd_t signal_read_stfd;
srs_netfd_t signal_read_stfd;
private:
SrsServer* server;
SrsCoroutine* trd;
@ -357,9 +358,9 @@ public:
* for instance RTMP connection to serve client.
* @param stfd, the client fd in st boxed, the underlayer fd.
*/
virtual int accept_client(SrsListenerType type, st_netfd_t stfd);
virtual int accept_client(SrsListenerType type, srs_netfd_t stfd);
private:
virtual SrsConnection* fd2conn(SrsListenerType type, st_netfd_t stfd);
virtual SrsConnection* fd2conn(SrsListenerType type, srs_netfd_t stfd);
// IConnectionManager
public:
/**

View file

@ -434,7 +434,7 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
should_update_source_id = false;
#ifdef SRS_PERF_QUEUE_COND_WAIT
mw_wait = st_cond_new();
mw_wait = srs_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
@ -448,7 +448,7 @@ SrsConsumer::~SrsConsumer()
srs_freep(queue);
#ifdef SRS_PERF_QUEUE_COND_WAIT
st_cond_destroy(mw_wait);
srs_cond_destroy(mw_wait);
#endif
}
@ -497,14 +497,14 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitte
// when encoder republish or overflow.
// @see https://github.com/ossrs/srs/pull/749
if (atc && duration_ms < 0) {
st_cond_signal(mw_wait);
srs_cond_signal(mw_wait);
mw_waiting = false;
return ret;
}
// when duration ok, signal to flush.
if (match_min_msgs && duration_ms > mw_duration) {
st_cond_signal(mw_wait);
srs_cond_signal(mw_wait);
mw_waiting = false;
return ret;
}
@ -550,7 +550,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
void SrsConsumer::wait(int nb_msgs, int duration)
{
if (paused) {
st_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
return;
}
@ -569,7 +569,7 @@ void SrsConsumer::wait(int nb_msgs, int duration)
mw_waiting = true;
// use cond block wait for high performance mode.
st_cond_wait(mw_wait);
srs_cond_wait(mw_wait);
}
#endif
@ -587,7 +587,7 @@ void SrsConsumer::wakeup()
{
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (mw_waiting) {
st_cond_signal(mw_wait);
srs_cond_signal(mw_wait);
mw_waiting = false;
}
#endif

View file

@ -228,7 +228,7 @@ private:
#ifdef SRS_PERF_QUEUE_COND_WAIT
// the cond wait for mw.
// @see https://github.com/ossrs/srs/issues/251
st_cond_t mw_wait;
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
int mw_duration;

View file

@ -23,6 +23,7 @@
#include <srs_app_st.hpp>
#include <st.h>
#include <string>
using namespace std;
@ -65,7 +66,7 @@ int SrsCoroutine::start()
return ret;
}
if((trd = st_thread_create(pfn, this, 1, 0)) == NULL){
if((trd = (srs_thread_t)st_thread_create(pfn, this, 1, 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("Thread.start: Create thread failed. ret=%d", ret);
return ret;
@ -86,7 +87,7 @@ void SrsCoroutine::stop()
interrupt();
void* res = NULL;
int ret = st_thread_join(trd, &res);
int ret = st_thread_join((st_thread_t)trd, &res);
srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err);
srs_assert(!ret);
@ -109,7 +110,7 @@ void SrsCoroutine::interrupt()
srs_info("Thread.interrupt: Interrupt thread, err=%d", err);
err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err);
st_thread_interrupt(trd);
st_thread_interrupt((st_thread_t)trd);
}
int SrsCoroutine::pull()
@ -145,180 +146,3 @@ void* SrsCoroutine::pfn(void* arg)
return res;
}
namespace internal
{
ISrsThreadHandler::ISrsThreadHandler()
{
}
ISrsThreadHandler::~ISrsThreadHandler()
{
}
void ISrsThreadHandler::on_thread_start()
{
}
int ISrsThreadHandler::on_before_cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
int ISrsThreadHandler::on_end_cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
void ISrsThreadHandler::on_thread_stop()
{
}
SrsThread::SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j)
{
name = n;
handler = h;
cims = ims;
trd = NULL;
loop = false;
context_id = -1;
joinable = j;
}
SrsThread::~SrsThread()
{
stop();
}
int SrsThread::cid()
{
return context_id;
}
int SrsThread::start()
{
int ret = ERROR_SUCCESS;
if(trd) {
srs_info("thread %s already running.", name);
return ret;
}
loop = true;
if((trd = st_thread_create(pfn, this, (joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
return ret;
}
void SrsThread::stop()
{
if (!trd) {
return;
}
// notify the cycle to stop loop.
loop = false;
// the interrupt will cause the socket to read/write error,
// which will terminate the cycle thread.
st_thread_interrupt(trd);
// when joinable, wait util quit.
if (joinable) {
// wait the thread to exit.
int ret = st_thread_join(trd, NULL);
srs_assert(ret == ERROR_SUCCESS);
}
trd = NULL;
}
bool SrsThread::can_loop()
{
return loop;
}
void SrsThread::stop_loop()
{
loop = false;
}
void SrsThread::cycle()
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: it's better for user to specifies the cid,
// because sometimes we need to merge cid, for example,
// the publish thread should use the same cid of connection.
_srs_context->generate_id();
srs_info("thread %s cycle start", name);
context_id = _srs_context->get_id();
srs_assert(handler);
handler->on_thread_start();
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", name, ret);
goto failed;
}
srs_info("thread %s on before cycle success", name);
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", name);
failed:
if (!loop) {
break;
}
// Should never use no timeout, just ignore it.
// to improve performance, donot sleep when interval is zero.
// @see: https://github.com/ossrs/srs/issues/237
if (cims != 0 && cims != SRS_CONSTS_NO_TMMS) {
st_usleep(cims * 1000);
}
}
srs_info("thread %s cycle finished", name);
// @remark in this callback, user may delete this, so never use this->xxx anymore.
handler->on_thread_stop();
}
void* SrsThread::pfn(void* arg)
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
obj->cycle();
// delete cid for valgrind to detect memory leak.
SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
if (ctx) {
ctx->clear_cid();
}
st_thread_exit(NULL);
return NULL;
}
}

View file

@ -87,7 +87,7 @@ private:
std::string name;
ISrsCoroutineHandler* handler;
private:
st_thread_t trd;
srs_thread_t trd;
int context;
int err;
private:
@ -135,133 +135,5 @@ private:
static void* pfn(void* arg);
};
// the internal classes, user should never use it.
// user should use the public classes at the bellow:
// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread
namespace internal
{
/**
* the handler for the thread, callback interface.
* the thread model defines as:
* handler->on_thread_start()
* while loop:
* handler->on_before_cycle()
* handler->cycle()
* handler->on_end_cycle()
* if !loop then break for user stop thread.
* sleep(CycleIntervalMilliseconds)
* handler->on_thread_stop()
* when stop, the thread will interrupt the st_thread,
* which will cause the socket to return error and
* terminate the cycle thread.
*
* @remark why should check can_loop() in cycle method?
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
*
* @remark why should use stop_loop() to terminate thread in itself?
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*
* @remark when should set the interval_us, and when not?
* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
* so the interval_us used to sleep for each cycle.
*/
class ISrsThreadHandler
{
public:
ISrsThreadHandler();
virtual ~ISrsThreadHandler();
public:
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int cycle() = 0;
virtual int on_end_cycle();
virtual void on_thread_stop();
};
/**
* provides servies from st_thread_t,
* for common thread usage.
*/
class SrsThread
{
private:
st_thread_t trd;
int context_id;
bool loop;
bool joinable;
const char* name;
private:
ISrsThreadHandler* handler;
// The cycle interval in ms.
int64_t cims;
public:
/**
* initialize the thread.
* @param n, human readable name for st debug.
* @param h, the cycle handler for the thread.
* @param ims, the sleep interval in ms when cycle finished.
* @param j, if joinable, other thread must stop the thread.
* @remark if joinable, thread never quit itself, or memory leak.
* @see: https://github.com/ossrs/srs/issues/78
* @remark about st debug, see st-1.9/README, _st_iterate_threads_flag
*/
/**
* TODO: FIXME: maybe all thread must be reap by others threads,
* @see: https://github.com/ossrs/srs/issues/77
*/
SrsThread(const char* n, ISrsThreadHandler* h, int64_t ims, bool j);
virtual ~SrsThread();
public:
/**
* get the context id. @see: ISrsThreadContext.get_id().
* used for parent thread to get the id.
* @remark when start thread, parent thread will block and wait for this id ready.
*/
virtual int cid();
/**
* start the thread, invoke the cycle of handler util
* user stop the thread.
* @remark ignore any error of cycle of handler.
* @remark user can start multiple times, ignore if already started.
* @remark wait for the cid is set by thread pfn.
*/
virtual int start();
/**
* stop the thread, wait for the thread to terminate.
* @remark user can stop multiple times, ignore if already stopped.
*/
virtual void stop();
public:
/**
* whether the thread should loop,
* used for handler->cycle() which has a loop method,
* to check this method, break if false.
*/
virtual bool can_loop();
/**
* for the loop thread to stop the loop.
* other thread can directly use stop() to stop loop and wait for quit.
* this stop loop method only set loop to false.
*/
virtual void stop_loop();
private:
virtual void cycle();
static void* pfn(void* arg);
};
}
#endif

View file

@ -31,14 +31,14 @@ using namespace std;
SrsCoroutineManager::SrsCoroutineManager()
{
cond = st_cond_new();
cond = srs_cond_new();
trd = new SrsCoroutine("manager", this);
}
SrsCoroutineManager::~SrsCoroutineManager()
{
srs_freep(trd);
st_cond_destroy(cond);
srs_cond_destroy(cond);
clear();
}
@ -51,7 +51,7 @@ int SrsCoroutineManager::start()
int SrsCoroutineManager::cycle()
{
while (!trd->pull()) {
st_cond_wait(cond);
srs_cond_wait(cond);
clear();
}
@ -61,7 +61,7 @@ int SrsCoroutineManager::cycle()
void SrsCoroutineManager::remove(ISrsConnection* c)
{
conns.push_back(c);
st_cond_signal(cond);
srs_cond_signal(cond);
}
void SrsCoroutineManager::clear()

View file

@ -42,7 +42,7 @@ class SrsCoroutineManager : virtual public ISrsCoroutineHandler, virtual public
private:
SrsCoroutine* trd;
std::vector<ISrsConnection*> conns;
st_cond_t cond;
srs_cond_t cond;
public:
SrsCoroutineManager();
virtual ~SrsCoroutineManager();

View file

@ -179,7 +179,7 @@ int srs_kill_forced(int& pid)
// 0 is not quit yet.
if (qpid == 0) {
st_usleep(10 * 1000);
srs_usleep(10 * 1000);
continue;
}
@ -204,7 +204,7 @@ int srs_kill_forced(int& pid)
// @remark when we use SIGKILL to kill process, it must be killed,
// so we always wait it to quit by infinite loop.
while (waitpid(pid, &status, 0) < 0) {
st_usleep(10 * 1000);
srs_usleep(10 * 1000);
continue;
}

View file

@ -216,7 +216,7 @@ int SrsIngestHlsInput::connect()
int64_t now = srs_update_system_time_ms();
if (now < next_connect_time) {
srs_trace("input hls wait for %dms", next_connect_time - now);
st_usleep((next_connect_time - now) * 1000);
srs_usleep((next_connect_time - now) * 1000);
}
// set all ts to dirty.

View file

@ -37,6 +37,7 @@ using namespace std;
#include <gperftools/profiler.h>
#endif
#include <unistd.h>
using namespace std;
#include <srs_kernel_error.hpp>

View file

@ -25,6 +25,7 @@
#include <stdarg.h>
#include <sys/time.h>
#include <unistd.h>
using namespace std;
#include <srs_kernel_error.hpp>
@ -45,18 +46,18 @@ int SrsThreadContext::generate_id()
static int id = 100;
int gid = id++;
cache[st_thread_self()] = gid;
cache[srs_thread_self()] = gid;
return gid;
}
int SrsThreadContext::get_id()
{
return cache[st_thread_self()];
return cache[srs_thread_self()];
}
int SrsThreadContext::set_id(int v)
{
st_thread_t self = st_thread_self();
srs_thread_t self = srs_thread_self();
int ov = 0;
if (cache.find(self) != cache.end()) {
@ -70,8 +71,8 @@ int SrsThreadContext::set_id(int v)
void SrsThreadContext::clear_cid()
{
st_thread_t self = st_thread_self();
std::map<st_thread_t, int>::iterator it = cache.find(self);
srs_thread_t self = srs_thread_self();
std::map<srs_thread_t, int>::iterator it = cache.find(self);
if (it != cache.end()) {
cache.erase(it);
}

View file

@ -38,7 +38,7 @@
class SrsThreadContext : public ISrsThreadContext
{
private:
std::map<st_thread_t, int> cache;
std::map<srs_thread_t, int> cache;
public:
SrsThreadContext();
virtual ~SrsThreadContext();

View file

@ -23,6 +23,7 @@
#include <srs_service_rtmp_conn.hpp>
#include <unistd.h>
using namespace std;
#include <srs_protocol_kbps.hpp>

View file

@ -23,6 +23,7 @@
#include <srs_service_st.hpp>
#include <st.h>
#include <fcntl.h>
#include <sys/socket.h>
using namespace std;
@ -30,6 +31,7 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_service_utility.hpp>
#include <srs_kernel_utility.hpp>
#ifdef __linux__
#include <sys/epoll.h>
@ -80,11 +82,11 @@ int srs_st_init()
return ret;
}
void srs_close_stfd(st_netfd_t& stfd)
void srs_close_stfd(srs_netfd_t& stfd)
{
if (stfd) {
// we must ensure the close is ok.
int err = st_netfd_close(stfd);
int err = st_netfd_close((st_netfd_t)stfd);
srs_assert(err != -1);
stfd = NULL;
}
@ -103,6 +105,150 @@ void srs_socket_reuse_addr(int fd)
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int));
}
srs_thread_t srs_thread_self()
{
return (srs_thread_t)st_thread_self();
}
int srs_socket_connect(string server, int port, int64_t tm, srs_netfd_t* pstfd)
{
int ret = ERROR_SUCCESS;
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_CONSTS_NO_TMMS) {
timeout = (st_utime_t)(tm * 1000);
}
*pstfd = NULL;
srs_netfd_t stfd = NULL;
sockaddr_in addr;
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
srs_fd_close_exec(sock);
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
// connect to server.
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
goto failed;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect((st_netfd_t)stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
goto failed;
}
srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
*pstfd = stfd;
return ret;
failed:
if (stfd) {
srs_close_stfd(stfd);
}
return ret;
}
srs_cond_t srs_cond_new()
{
return (srs_cond_t)st_cond_new();
}
int srs_cond_destroy(srs_cond_t cond)
{
return st_cond_destroy((st_cond_t)cond);
}
int srs_cond_wait(srs_cond_t cond)
{
return st_cond_wait((st_cond_t)cond);
}
int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout)
{
return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout);
}
int srs_cond_signal(srs_cond_t cond)
{
return st_cond_signal((st_cond_t)cond);
}
srs_mutex_t srs_mutex_new()
{
return (srs_mutex_t)st_mutex_new();
}
int srs_mutex_destroy(srs_mutex_t mutex)
{
return st_mutex_destroy((st_mutex_t)mutex);
}
int srs_mutex_lock(srs_mutex_t mutex)
{
return st_mutex_lock((st_mutex_t)mutex);
}
int srs_mutex_unlock(srs_mutex_t mutex)
{
return st_mutex_unlock((st_mutex_t)mutex);
}
int srs_netfd_fileno(srs_netfd_t stfd)
{
return st_netfd_fileno((st_netfd_t)stfd);
}
int srs_usleep(srs_utime_t usecs)
{
return st_usleep((st_utime_t)usecs);
}
srs_netfd_t srs_netfd_open_socket(int osfd)
{
return (srs_netfd_t)st_netfd_open_socket(osfd);
}
srs_netfd_t srs_netfd_open(int osfd)
{
return (srs_netfd_t)st_netfd_open(osfd);
}
int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout)
{
return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout);
}
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{
return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);
}
ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
{
return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
}
SrsStSocket::SrsStSocket()
{
stfd = NULL;
@ -114,7 +260,7 @@ SrsStSocket::~SrsStSocket()
{
}
int SrsStSocket::initialize(st_netfd_t fd)
int SrsStSocket::initialize(srs_netfd_t fd)
{
stfd = fd;
return ERROR_SUCCESS;
@ -161,9 +307,9 @@ int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read(stfd, buf, size, rtm * 1000);
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
}
if (nread) {
@ -197,9 +343,9 @@ int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read_fully(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully(stfd, buf, size, rtm * 1000);
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
}
if (nread) {
@ -233,9 +379,9 @@ int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_write(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write(stfd, buf, size, stm * 1000);
nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
}
if (nwrite) {
@ -264,9 +410,9 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev(stfd, iov, iov_size, stm * 1000);
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
}
if (nwrite) {

View file

@ -27,16 +27,24 @@
#include <srs_core.hpp>
#include <string>
#include <st.h>
#include <srs_protocol_io.hpp>
// Wrap for coroutine.
typedef void* srs_netfd_t;
typedef void* srs_thread_t;
typedef void* srs_cond_t;
typedef void* srs_mutex_t;
typedef uint64_t srs_utime_t;
#define SRS_UTIME_NO_TIMEOUT ((srs_utime_t) -1LL)
// initialize st, requires epoll.
extern int srs_st_init();
// close the netfd, and close the underlayer fd.
// @remark when close, user must ensure io completed.
extern void srs_close_stfd(st_netfd_t& stfd);
extern void srs_close_stfd(srs_netfd_t& stfd);
// Set the FD_CLOEXEC of FD.
extern void srs_fd_close_exec(int fd);
@ -44,6 +52,38 @@ extern void srs_fd_close_exec(int fd);
// Set the SO_REUSEADDR of socket.
extern void srs_socket_reuse_addr(int fd);
// Get current coroutine/thread.
extern srs_thread_t srs_thread_self();
// client open socket and connect to server.
// @param tm The timeout in ms.
extern int srs_socket_connect(std::string server, int port, int64_t tm, srs_netfd_t* pstfd);
// Wrap for coroutine.
extern srs_cond_t srs_cond_new();
extern int srs_cond_destroy(srs_cond_t cond);
extern int srs_cond_wait(srs_cond_t cond);
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
extern int srs_cond_signal(srs_cond_t cond);
extern srs_mutex_t srs_mutex_new();
extern int srs_mutex_destroy(srs_mutex_t mutex);
extern int srs_mutex_lock(srs_mutex_t mutex);
extern int srs_mutex_unlock(srs_mutex_t mutex);
extern int srs_netfd_fileno(srs_netfd_t stfd);
extern int srs_usleep(srs_utime_t usecs);
extern srs_netfd_t srs_netfd_open_socket(int osfd);
extern srs_netfd_t srs_netfd_open(int osfd);
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);
extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);
/**
* the socket provides TCP socket over st,
* that is, the sync socket mechanism.
@ -59,13 +99,13 @@ private:
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
st_netfd_t stfd;
srs_netfd_t stfd;
public:
SrsStSocket();
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
virtual int initialize(st_netfd_t fd);
virtual int initialize(srs_netfd_t fd);
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
@ -100,7 +140,7 @@ public:
class SrsTcpClient : public ISrsProtocolReaderWriter
{
private:
st_netfd_t stfd;
srs_netfd_t stfd;
SrsStSocket* io;
private:
std::string host;

View file

@ -36,65 +36,6 @@ using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd)
{
int ret = ERROR_SUCCESS;
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_CONSTS_NO_TMMS) {
timeout = (st_utime_t)(tm * 1000);
}
*pstfd = NULL;
st_netfd_t stfd = NULL;
sockaddr_in addr;
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
srs_fd_close_exec(sock);
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
// connect to server.
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
goto failed;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
goto failed;
}
srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
*pstfd = stfd;
return ret;
failed:
if (stfd) {
srs_close_stfd(stfd);
}
return ret;
}
bool srs_string_is_http(string url)
{
return srs_string_starts_with(url, "http://", "https://");

View file

@ -32,10 +32,6 @@
#include <srs_service_st.hpp>
// client open socket and connect to server.
// @param tm The timeout in ms.
extern int srs_socket_connect(std::string server, int port, int64_t tm, st_netfd_t* pstfd);
// whether the url is starts with http:// or https://
extern bool srs_string_is_http(std::string url);
extern bool srs_string_is_rtmp(std::string url);