1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge service to protocol

This commit is contained in:
winlin 2020-06-02 15:02:59 +08:00
parent f86706e0a2
commit 95f656b46d
17 changed files with 21 additions and 35 deletions

View file

@ -0,0 +1,41 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_conn.hpp>
ISrsConnection::ISrsConnection()
{
}
ISrsConnection::~ISrsConnection()
{
}
IConnectionManager::IConnectionManager()
{
}
IConnectionManager::~IConnectionManager()
{
}

View file

@ -0,0 +1,54 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_CONN_HPP
#define SRS_SERVICE_CONN_HPP
#include <srs_core.hpp>
#include <string>
// The connection interface for all HTTP/RTMP/RTSP object.
class ISrsConnection
{
public:
ISrsConnection();
virtual ~ISrsConnection();
public:
// Get remote ip address.
virtual std::string remote_ip() = 0;
};
// The manager for connection.
class IConnectionManager
{
public:
IConnectionManager();
virtual ~IConnectionManager();
public:
// Remove then free the specified connection.
virtual void remove(ISrsConnection* c) = 0;
};
#endif

View file

@ -0,0 +1,237 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_http_client.hpp>
#include <arpa/inet.h>
#include <sstream>
using namespace std;
#include <srs_protocol_kbps.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_core_autofree.hpp>
#include <srs_service_http_conn.hpp>
SrsHttpClient::SrsHttpClient()
{
transport = NULL;
clk = new SrsWallClock();
kbps = new SrsKbps(clk);
parser = NULL;
recv_timeout = timeout = SRS_UTIME_NO_TIMEOUT;
port = 0;
}
SrsHttpClient::~SrsHttpClient()
{
disconnect();
srs_freep(kbps);
srs_freep(clk);
srs_freep(parser);
}
srs_error_t SrsHttpClient::initialize(string h, int p, srs_utime_t tm)
{
srs_error_t err = srs_success;
srs_freep(parser);
parser = new SrsHttpParser();
if ((err = parser->initialize(HTTP_RESPONSE, false)) != srs_success) {
return srs_error_wrap(err, "http: init parser");
}
// Always disconnect the transport.
host = h;
port = p;
recv_timeout = timeout = tm;
disconnect();
// ep used for host in header.
string ep = host;
if (port > 0 && port != SRS_CONSTS_HTTP_DEFAULT_PORT) {
ep += ":" + srs_int2str(port);
}
// Set default value for headers.
headers["Host"] = ep;
headers["Connection"] = "Keep-Alive";
headers["User-Agent"] = RTMP_SIG_SRS_SERVER;
headers["Content-Type"] = "application/json";
return err;
}
SrsHttpClient* SrsHttpClient::set_header(string k, string v)
{
headers[k] = v;
return this;
}
srs_error_t SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
{
*ppmsg = NULL;
srs_error_t err = srs_success;
// always set the content length.
headers["Content-Length"] = srs_int2str(req.length());
if ((err = connect()) != srs_success) {
return srs_error_wrap(err, "http: connect server");
}
// send POST request to uri
// POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
std::stringstream ss;
ss << "POST " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF;
for (map<string, string>::iterator it = headers.begin(); it != headers.end(); ++it) {
string key = it->first;
string value = it->second;
ss << key << ": " << value << SRS_HTTP_CRLF;
}
ss << SRS_HTTP_CRLF << req;
std::string data = ss.str();
if ((err = transport->write((void*)data.c_str(), data.length(), NULL)) != srs_success) {
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();
return srs_error_wrap(err, "http: write");
}
ISrsHttpMessage* msg = NULL;
if ((err = parser->parse_message(transport, &msg)) != srs_success) {
return srs_error_wrap(err, "http: parse response");
}
srs_assert(msg);
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
return err;
}
srs_error_t SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
{
*ppmsg = NULL;
srs_error_t err = srs_success;
// always set the content length.
headers["Content-Length"] = srs_int2str(req.length());
if ((err = connect()) != srs_success) {
return srs_error_wrap(err, "http: connect server");
}
// send POST request to uri
// GET %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
std::stringstream ss;
ss << "GET " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF;
for (map<string, string>::iterator it = headers.begin(); it != headers.end(); ++it) {
string key = it->first;
string value = it->second;
ss << key << ": " << value << SRS_HTTP_CRLF;
}
ss << SRS_HTTP_CRLF << req;
std::string data = ss.str();
if ((err = transport->write((void*)data.c_str(), data.length(), NULL)) != srs_success) {
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();
return srs_error_wrap(err, "http: write");
}
ISrsHttpMessage* msg = NULL;
if ((err = parser->parse_message(transport, &msg)) != srs_success) {
return srs_error_wrap(err, "http: parse response");
}
srs_assert(msg);
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
return err;
}
void SrsHttpClient::set_recv_timeout(srs_utime_t tm)
{
recv_timeout = tm;
}
void SrsHttpClient::kbps_sample(const char* label, int64_t age)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%" PRId64 ", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
void SrsHttpClient::disconnect()
{
kbps->set_io(NULL, NULL);
srs_freep(transport);
}
srs_error_t SrsHttpClient::connect()
{
srs_error_t err = srs_success;
// When transport connected, ignore.
if (transport) {
return err;
}
transport = new SrsTcpClient(host, port, timeout);
if ((err = transport->connect()) != srs_success) {
disconnect();
return srs_error_wrap(err, "http: tcp connect %s:%d to=%dms, rto=%dms",
host.c_str(), port, srsu2msi(timeout), srsu2msi(recv_timeout));
}
// Set the recv/send timeout in srs_utime_t.
transport->set_recv_timeout(recv_timeout);
transport->set_send_timeout(timeout);
kbps->set_io(transport, transport);
return err;
}

View file

@ -0,0 +1,104 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_HTTP_CLIENT_HPP
#define SRS_SERVICE_HTTP_CLIENT_HPP
#include <srs_core.hpp>
#include <string>
#include <map>
#include <srs_service_st.hpp>
#include <srs_http_stack.hpp>
class SrsHttpUri;
class SrsHttpParser;
class ISrsHttpMessage;
class SrsStSocket;
class SrsKbps;
class SrsWallClock;
class SrsTcpClient;
// The default timeout for http client.
#define SRS_HTTP_CLIENT_TIMEOUT (30 * SRS_UTIME_SECONDS)
// The client to GET/POST/PUT/DELETE over HTTP.
// @remark We will reuse the TCP transport until initialize or channel error,
// such as send/recv failed.
// Usage:
// SrsHttpClient hc;
// hc.initialize("127.0.0.1", 80, 9000);
// hc.post("/api/v1/version", "Hello world!", NULL);
class SrsHttpClient
{
private:
// The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected.
// We will disconnect transport when initialize or channel error, such as send/recv error.
SrsTcpClient* transport;
SrsHttpParser* parser;
std::map<std::string, std::string> headers;
SrsKbps* kbps;
SrsWallClock* clk;
private:
// The timeout in srs_utime_t.
srs_utime_t timeout;
srs_utime_t recv_timeout;
// The host name or ip.
std::string host;
int port;
public:
SrsHttpClient();
virtual ~SrsHttpClient();
public:
// Initliaze the client, disconnect the transport, renew the HTTP parser.
// @param tm The underlayer TCP transport timeout in srs_utime_t.
// @remark we will set default values in headers, which can be override by set_header.
virtual srs_error_t initialize(std::string h, int p, srs_utime_t tm = SRS_HTTP_CLIENT_TIMEOUT);
// Set HTTP request header in header[k]=v.
// @return the HTTP client itself.
virtual SrsHttpClient* set_header(std::string k, std::string v);
public:
// Post data to the uri.
// @param the path to request on.
// @param req the data post to uri. empty string to ignore.
// @param ppmsg output the http message to read the response.
// @remark user must free the ppmsg if not NULL.
virtual srs_error_t post(std::string path, std::string req, ISrsHttpMessage** ppmsg);
// Get data from the uri.
// @param the path to request on.
// @param req the data post to uri. empty string to ignore.
// @param ppmsg output the http message to read the response.
// @remark user must free the ppmsg if not NULL.
virtual srs_error_t get(std::string path, std::string req, ISrsHttpMessage** ppmsg);
private:
virtual void set_recv_timeout(srs_utime_t tm);
public:
virtual void kbps_sample(const char* label, int64_t age);
private:
virtual void disconnect();
virtual srs_error_t connect();
};
#endif

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,284 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_HTTP_CONN_HPP
#define SRS_SERVICE_HTTP_CONN_HPP
#include <srs_core.hpp>
#include <string>
#include <srs_http_stack.hpp>
class ISrsConnection;
class SrsFastStream;
class SrsRequest;
class ISrsReader;
class SrsHttpResponseReader;
class ISrsProtocolReadWriter;
// A wrapper for http-parser,
// provides HTTP message originted service.
class SrsHttpParser
{
private:
http_parser_settings settings;
http_parser parser;
// The global parse buffer.
SrsFastStream* buffer;
// Whether allow jsonp parse.
bool jsonp;
private:
std::string field_name;
std::string field_value;
SrsHttpParseState state;
http_parser hp_header;
std::string url;
SrsHttpHeader* header;
private:
// Point to the start of body.
const char* p_body_start;
// To discover the length of header, point to the last few bytes in header.
const char* p_header_tail;
public:
SrsHttpParser();
virtual ~SrsHttpParser();
public:
// initialize the http parser with specified type,
// one parser can only parse request or response messages.
// @param allow_jsonp whether allow jsonp parser, which indicates the method in query string.
virtual srs_error_t initialize(enum http_parser_type type, bool allow_jsonp = false);
// always parse a http message,
// that is, the *ppmsg always NOT-NULL when return success.
// or error and *ppmsg must be NULL.
// @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
// @remark user must free the ppmsg if not NULL.
virtual srs_error_t parse_message(ISrsReader* reader, ISrsHttpMessage** ppmsg);
private:
// parse the HTTP message to member field: msg.
virtual srs_error_t parse_message_imp(ISrsReader* reader);
private:
static int on_message_begin(http_parser* parser);
static int on_headers_complete(http_parser* parser);
static int on_message_complete(http_parser* parser);
static int on_url(http_parser* parser, const char* at, size_t length);
static int on_header_field(http_parser* parser, const char* at, size_t length);
static int on_header_value(http_parser* parser, const char* at, size_t length);
static int on_body(http_parser* parser, const char* at, size_t length);
};
// A Request represents an HTTP request received by a server
// or to be sent by a client.
//
// The field semantics differ slightly between client and server
// usage. In addition to the notes on the fields below, see the
// documentation for Request.Write and RoundTripper.
class SrsHttpMessage : public ISrsHttpMessage
{
private:
// The body object, reader object.
// @remark, user can get body in string by get_body().
SrsHttpResponseReader* _body;
// Whether the body is infinite chunked.
bool infinite_chunked;
// Use a buffer to read and send ts file.
// The transport connection, can be NULL.
ISrsConnection* owner_conn;
private:
uint8_t _method;
uint16_t _status;
int64_t _content_length;
private:
// The http headers
SrsHttpHeader _header;
// Whether the request indicates should keep alive for the http connection.
bool _keep_alive;
// Whether the body is chunked.
bool chunked;
private:
// The parsed url.
std::string _url;
// The extension of file, for example, .flv
std::string _ext;
// The uri parser
SrsHttpUri* _uri;
// The query map
std::map<std::string, std::string> _query;
private:
// Whether request is jsonp.
bool jsonp;
// The method in QueryString will override the HTTP method.
std::string jsonp_method;
public:
SrsHttpMessage(ISrsReader* reader = NULL, SrsFastStream* buffer = NULL);
virtual ~SrsHttpMessage();
public:
// Set the basic information for HTTP request.
// @remark User must call set_basic before set_header, because the content_length will be overwrite by header.
virtual void set_basic(uint8_t method, uint16_t status, int64_t content_length);
// Set HTTP header and whether the request require keep alive.
// @remark User must call set_header before set_url, because the Host in header is used for url.
virtual void set_header(SrsHttpHeader* header, bool keep_alive);
// set the original messages, then update the message.
virtual srs_error_t set_url(std::string url, bool allow_jsonp);
public:
// Get the owner connection, maybe NULL.
virtual ISrsConnection* connection();
virtual void set_connection(ISrsConnection* conn);
public:
virtual uint8_t method();
virtual uint16_t status_code();
// The method helpers.
virtual std::string method_str();
virtual bool is_http_get();
virtual bool is_http_put();
virtual bool is_http_post();
virtual bool is_http_delete();
virtual bool is_http_options();
// Whether body is chunked encoding, for reader only.
virtual bool is_chunked();
// Whether body is infinite chunked encoding.
// @remark set by enter_infinite_chunked.
virtual bool is_infinite_chunked();
// Whether should keep the connection alive.
virtual bool is_keep_alive();
// The uri contains the host and path.
virtual std::string uri();
// The url maybe the path.
virtual std::string url();
virtual std::string host();
virtual int port();
virtual std::string path();
virtual std::string query();
virtual std::string ext();
// Get the RESTful matched id.
virtual int parse_rest_id(std::string pattern);
public:
virtual srs_error_t enter_infinite_chunked();
public:
// Read body to string.
// @remark for small http body.
virtual srs_error_t body_read_all(std::string& body);
// Get the body reader, to read one by one.
// @remark when body is very large, or chunked, use this.
virtual ISrsHttpResponseReader* body_reader();
// The content length, -1 for chunked or not set.
virtual int64_t content_length();
// Get the param in query string, for instance, query is "start=100&end=200",
// then query_get("start") is "100", and query_get("end") is "200"
virtual std::string query_get(std::string key);
// Get the headers.
virtual SrsHttpHeader* header();
public:
// Convert the http message to a request.
// @remark user must free the return request.
virtual SrsRequest* to_request(std::string vhost);
public:
virtual bool is_jsonp();
};
// The http chunked header size,
// for writev, there always one chunk to send it.
#define SRS_HTTP_HEADER_CACHE_SIZE 64
class ISrsHttpHeaderFilter
{
public:
ISrsHttpHeaderFilter();
virtual ~ISrsHttpHeaderFilter();
public:
// Filter the HTTP header h.
virtual srs_error_t filter(SrsHttpHeader* h) = 0;
};
// Response writer use st socket
class SrsHttpResponseWriter : public ISrsHttpResponseWriter
{
private:
ISrsProtocolReadWriter* skt;
SrsHttpHeader* hdr;
// Before writing header, there is a chance to filter it,
// such as remove some headers or inject new.
ISrsHttpHeaderFilter* hf;
private:
char header_cache[SRS_HTTP_HEADER_CACHE_SIZE];
iovec* iovss_cache;
int nb_iovss_cache;
private:
// Reply header has been (logically) written
bool header_wrote;
// The status code passed to WriteHeader
int status;
private:
// The explicitly-declared Content-Length; or -1
int64_t content_length;
// The number of bytes written in body
int64_t written;
private:
// The wroteHeader tells whether the header's been written to "the
// wire" (or rather: w.conn.buf). this is unlike
// (*response).wroteHeader, which tells only whether it was
// logically written.
bool header_sent;
public:
SrsHttpResponseWriter(ISrsProtocolReadWriter* io);
virtual ~SrsHttpResponseWriter();
public:
virtual srs_error_t final_request();
virtual SrsHttpHeader* header();
virtual srs_error_t write(char* data, int size);
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
virtual void write_header(int code);
virtual srs_error_t send_header(char* data, int size);
};
// Response reader use st socket.
class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
{
private:
ISrsReader* skt;
SrsHttpMessage* owner;
SrsFastStream* buffer;
bool is_eof;
// The left bytes in chunk.
size_t nb_left_chunk;
// The number of bytes of current chunk.
size_t nb_chunk;
// Already read total bytes.
int64_t nb_total_read;
public:
// Generally the reader is the under-layer io such as socket,
// while buffer is a fast cache which may have cached some data from reader.
SrsHttpResponseReader(SrsHttpMessage* msg, ISrsReader* reader, SrsFastStream* buffer);
virtual ~SrsHttpResponseReader();
// Interface ISrsHttpResponseReader
public:
virtual bool eof();
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
private:
virtual srs_error_t read_chunked(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_specified(void* buf, size_t size, ssize_t* nread);
};
#endif

View file

@ -0,0 +1,276 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_log.hpp>
#include <stdarg.h>
#include <sys/time.h>
#include <unistd.h>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#define SRS_BASIC_LOG_SIZE 1024
SrsThreadContext::SrsThreadContext()
{
}
SrsThreadContext::~SrsThreadContext()
{
}
int SrsThreadContext::generate_id()
{
static int id = 0;
if (id == 0) {
id = (100 + ((uint32_t)(int64_t)this)%1000);
}
int gid = id++;
cache[srs_thread_self()] = gid;
return gid;
}
int SrsThreadContext::get_id()
{
return cache[srs_thread_self()];
}
int SrsThreadContext::set_id(int v)
{
srs_thread_t self = srs_thread_self();
int ov = 0;
if (cache.find(self) != cache.end()) {
ov = cache[self];
}
cache[self] = v;
return ov;
}
void SrsThreadContext::clear_cid()
{
srs_thread_t self = srs_thread_self();
std::map<srs_thread_t, int>::iterator it = cache.find(self);
if (it != cache.end()) {
cache.erase(it);
}
}
// LCOV_EXCL_START
SrsConsoleLog::SrsConsoleLog(SrsLogLevel l, bool u)
{
level = l;
utc = u;
buffer = new char[SRS_BASIC_LOG_SIZE];
}
SrsConsoleLog::~SrsConsoleLog()
{
srs_freepa(buffer);
}
srs_error_t SrsConsoleLog::initialize()
{
return srs_success;
}
void SrsConsoleLog::reopen()
{
}
void SrsConsoleLog::verbose(const char* tag, int context_id, const char* fmt, ...)
{
if (level > SrsLogLevelVerbose) {
return;
}
int size = 0;
if (!srs_log_header(buffer, SRS_BASIC_LOG_SIZE, utc, false, tag, context_id, "Verb", &size)) {
return;
}
va_list ap;
va_start(ap, fmt);
// we reserved 1 bytes for the new line.
size += vsnprintf(buffer + size, SRS_BASIC_LOG_SIZE - size, fmt, ap);
va_end(ap);
fprintf(stdout, "%s\n", buffer);
}
void SrsConsoleLog::info(const char* tag, int context_id, const char* fmt, ...)
{
if (level > SrsLogLevelInfo) {
return;
}
int size = 0;
if (!srs_log_header(buffer, SRS_BASIC_LOG_SIZE, utc, false, tag, context_id, "Debug", &size)) {
return;
}
va_list ap;
va_start(ap, fmt);
// we reserved 1 bytes for the new line.
size += vsnprintf(buffer + size, SRS_BASIC_LOG_SIZE - size, fmt, ap);
va_end(ap);
fprintf(stdout, "%s\n", buffer);
}
void SrsConsoleLog::trace(const char* tag, int context_id, const char* fmt, ...)
{
if (level > SrsLogLevelTrace) {
return;
}
int size = 0;
if (!srs_log_header(buffer, SRS_BASIC_LOG_SIZE, utc, false, tag, context_id, "Trace", &size)) {
return;
}
va_list ap;
va_start(ap, fmt);
// we reserved 1 bytes for the new line.
size += vsnprintf(buffer + size, SRS_BASIC_LOG_SIZE - size, fmt, ap);
va_end(ap);
fprintf(stdout, "%s\n", buffer);
}
void SrsConsoleLog::warn(const char* tag, int context_id, const char* fmt, ...)
{
if (level > SrsLogLevelWarn) {
return;
}
int size = 0;
if (!srs_log_header(buffer, SRS_BASIC_LOG_SIZE, utc, true, tag, context_id, "Warn", &size)) {
return;
}
va_list ap;
va_start(ap, fmt);
// we reserved 1 bytes for the new line.
size += vsnprintf(buffer + size, SRS_BASIC_LOG_SIZE - size, fmt, ap);
va_end(ap);
fprintf(stderr, "%s\n", buffer);
}
void SrsConsoleLog::error(const char* tag, int context_id, const char* fmt, ...)
{
if (level > SrsLogLevelError) {
return;
}
int size = 0;
if (!srs_log_header(buffer, SRS_BASIC_LOG_SIZE, utc, true, tag, context_id, "Error", &size)) {
return;
}
va_list ap;
va_start(ap, fmt);
// we reserved 1 bytes for the new line.
size += vsnprintf(buffer + size, SRS_BASIC_LOG_SIZE - size, fmt, ap);
va_end(ap);
// add strerror() to error msg.
if (errno != 0) {
size += snprintf(buffer + size, SRS_BASIC_LOG_SIZE - size, "(%s)", strerror(errno));
}
fprintf(stderr, "%s\n", buffer);
}
// LCOV_EXCL_STOP
bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, int cid, const char* level, int* psize)
{
// clock time
timeval tv;
if (gettimeofday(&tv, NULL) == -1) {
return false;
}
// to calendar time
struct tm* tm;
if (utc) {
if ((tm = gmtime(&tv.tv_sec)) == NULL) {
return false;
}
} else {
if ((tm = localtime(&tv.tv_sec)) == NULL) {
return false;
}
}
int written = -1;
if (dangerous) {
if (tag) {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%s][%d][%d][%d] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, tag, getpid(), cid, errno);
} else {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%d][%d][%d] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, getpid(), cid, errno);
}
} else {
if (tag) {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%s][%d][%d] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, tag, getpid(), cid);
} else {
written = snprintf(buffer, size,
"[%d-%02d-%02d %02d:%02d:%02d.%03d][%s][%d][%d] ",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec / 1000),
level, getpid(), cid);
}
}
// Exceed the size, ignore this log.
// Check size to avoid security issue https://github.com/ossrs/srs/issues/1229
if (written >= size) {
return false;
}
if (written == -1) {
return false;
}
// write the header size.
*psize = written;
return true;
}

View file

@ -0,0 +1,80 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_LOG_HPP
#define SRS_SERVICE_LOG_HPP
#include <srs_core.hpp>
#include <map>
#include <srs_service_st.hpp>
#include <srs_kernel_log.hpp>
// The st thread context, get_id will get the st-thread id,
// which identify the client.
class SrsThreadContext : public ISrsThreadContext
{
private:
std::map<srs_thread_t, int> cache;
public:
SrsThreadContext();
virtual ~SrsThreadContext();
public:
virtual int generate_id();
virtual int get_id();
virtual int set_id(int v);
public:
virtual void clear_cid();
};
// The basic console log, which write log to console.
class SrsConsoleLog : public ISrsLog
{
private:
SrsLogLevel level;
bool utc;
private:
char* buffer;
public:
SrsConsoleLog(SrsLogLevel l, bool u);
virtual ~SrsConsoleLog();
// Interface ISrsLog
public:
virtual srs_error_t initialize();
virtual void reopen();
virtual void verbose(const char* tag, int context_id, const char* fmt, ...);
virtual void info(const char* tag, int context_id, const char* fmt, ...);
virtual void trace(const char* tag, int context_id, const char* fmt, ...);
virtual void warn(const char* tag, int context_id, const char* fmt, ...);
virtual void error(const char* tag, int context_id, const char* fmt, ...);
};
// Generate the log header.
// @param dangerous Whether log is warning or error, log the errno if true.
// @param utc Whether use UTC time format in the log header.
// @param psize Output the actual header size.
// @remark It's a internal API.
bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char* tag, int cid, const char* level, int* psize);
#endif

View file

@ -0,0 +1,236 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_rtmp_conn.hpp>
#include <unistd.h>
using namespace std;
#include <srs_protocol_kbps.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_service_st.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_service_utility.hpp>
SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm)
{
clk = new SrsWallClock();
kbps = new SrsKbps(clk);
url = r;
connect_timeout = ctm;
stream_timeout = stm;
req = new SrsRequest();
srs_parse_rtmp_url(url, req->tcUrl, req->stream);
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
transport = NULL;
client = NULL;
stream_id = 0;
}
SrsBasicRtmpClient::~SrsBasicRtmpClient()
{
close();
srs_freep(kbps);
srs_freep(clk);
}
srs_error_t SrsBasicRtmpClient::connect()
{
srs_error_t err = srs_success;
close();
transport = new SrsTcpClient(req->host, req->port, srs_utime_t(connect_timeout));
client = new SrsRtmpClient(transport);
kbps->set_io(transport, transport);
if ((err = transport->connect()) != srs_success) {
close();
return srs_error_wrap(err, "connect");
}
client->set_recv_timeout(stream_timeout);
client->set_send_timeout(stream_timeout);
// connect to vhost/app
if ((err = client->handshake()) != srs_success) {
return srs_error_wrap(err, "handshake");
}
if ((err = connect_app()) != srs_success) {
return srs_error_wrap(err, "connect app");
}
if ((err = client->create_stream(stream_id)) != srs_success) {
return srs_error_wrap(err, "create stream_id=%d", stream_id);
}
return err;
}
void SrsBasicRtmpClient::close()
{
kbps->set_io(NULL, NULL);
srs_freep(client);
srs_freep(transport);
}
srs_error_t SrsBasicRtmpClient::connect_app()
{
return do_connect_app(srs_get_public_internet_address(), false);
}
srs_error_t SrsBasicRtmpClient::do_connect_app(string local_ip, bool debug)
{
srs_error_t err = srs_success;
// args of request takes the srs info.
if (req->args == NULL) {
req->args = SrsAmf0Any::object();
}
// notify server the edge identity,
// @see https://github.com/ossrs/srs/issues/147
SrsAmf0Object* data = req->args;
data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
// for edge to directly get the id of client.
data->set("srs_pid", SrsAmf0Any::number(getpid()));
data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
// local ip of edge
data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
// generate the tcUrl
std::string param = "";
std::string target_vhost = req->vhost;
std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port);
// replace the tcUrl in request,
// which will replace the tc_url in client.connect_app().
req->tcUrl = tc_url;
// upnode server identity will show in the connect_app of client.
// @see https://github.com/ossrs/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
SrsServerInfo si;
if ((err = client->connect_app(req->app, tc_url, req, debug, &si)) != srs_success) {
return srs_error_wrap(err, "connect app tcUrl=%s, debug=%d", tc_url.c_str(), debug);
}
return err;
}
srs_error_t SrsBasicRtmpClient::publish(int chunk_size)
{
srs_error_t err = srs_success;
// Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
string stream = srs_generate_stream_with_query(req->host, req->vhost, req->stream, req->param);
// publish.
if ((err = client->publish(stream, stream_id, chunk_size)) != srs_success) {
return srs_error_wrap(err, "publish failed, stream=%s, stream_id=%d", stream.c_str(), stream_id);
}
return err;
}
srs_error_t SrsBasicRtmpClient::play(int chunk_size)
{
srs_error_t err = srs_success;
// Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
string stream = srs_generate_stream_with_query(req->host, req->vhost, req->stream, req->param);
if ((err = client->play(stream, stream_id, chunk_size)) != srs_success) {
return srs_error_wrap(err, "connect with server failed, stream=%s, stream_id=%d", stream.c_str(), stream_id);
}
return err;
}
void SrsBasicRtmpClient::kbps_sample(const char* label, int64_t age)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%" PRId64 ", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
void SrsBasicRtmpClient::kbps_sample(const char* label, int64_t age, int msgs)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%" PRId64 ", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, msgs, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
int SrsBasicRtmpClient::sid()
{
return stream_id;
}
srs_error_t SrsBasicRtmpClient::recv_message(SrsCommonMessage** pmsg)
{
return client->recv_message(pmsg);
}
srs_error_t SrsBasicRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
return client->decode_message(msg, ppacket);
}
srs_error_t SrsBasicRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
return client->send_and_free_messages(msgs, nb_msgs, stream_id);
}
srs_error_t SrsBasicRtmpClient::send_and_free_message(SrsSharedPtrMessage* msg)
{
return client->send_and_free_message(msg, stream_id);
}
void SrsBasicRtmpClient::set_recv_timeout(srs_utime_t timeout)
{
transport->set_recv_timeout(timeout);
}

View file

@ -0,0 +1,92 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_RTMP_CONN_HPP
#define SRS_SERVICE_RTMP_CONN_HPP
#include <srs_core.hpp>
#include <string>
class SrsRequest;
class SrsTcpClient;
class SrsRtmpClient;
class SrsCommonMessage;
class SrsSharedPtrMessage;
class SrsPacket;
class SrsKbps;
class SrsWallClock;
// The simple RTMP client, provides friendly APIs.
// @remark Should never use client when closed.
// Usage:
// SrsBasicRtmpClient client("rtmp://127.0.0.1:1935/live/livestream", 3000, 9000);
// client.connect();
// client.play();
// client.close();
class SrsBasicRtmpClient
{
private:
std::string url;
srs_utime_t connect_timeout;
srs_utime_t stream_timeout;
protected:
SrsRequest* req;
private:
SrsTcpClient* transport;
SrsRtmpClient* client;
SrsKbps* kbps;
SrsWallClock* clk;
int stream_id;
public:
// Constructor.
// @param r The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost
// @param ctm The timeout in srs_utime_t to connect to server.
// @param stm The timeout in srs_utime_t to delivery A/V stream.
SrsBasicRtmpClient(std::string r, srs_utime_t ctm, srs_utime_t stm);
virtual ~SrsBasicRtmpClient();
public:
// Connect, handshake and connect app to RTMP server.
// @remark We always close the transport.
virtual srs_error_t connect();
virtual void close();
protected:
virtual srs_error_t connect_app();
virtual srs_error_t do_connect_app(std::string local_ip, bool debug);
public:
virtual srs_error_t publish(int chunk_size);
virtual srs_error_t play(int chunk_size);
virtual void kbps_sample(const char* label, int64_t age);
virtual void kbps_sample(const char* label, int64_t age, int msgs);
virtual int sid();
public:
virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg);
public:
virtual void set_recv_timeout(srs_utime_t timeout);
};
#endif

View file

@ -0,0 +1,710 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_st.hpp>
#include <st.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netdb.h>
using namespace std;
#include <srs_core_autofree.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_service_utility.hpp>
#include <srs_kernel_utility.hpp>
// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512
#ifdef __linux__
#include <sys/epoll.h>
bool srs_st_epoll_is_supported(void)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = NULL;
/* Guaranteed to fail */
epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev);
return (errno != ENOSYS);
}
#endif
srs_error_t srs_st_init()
{
#ifdef __linux__
// check epoll, some old linux donot support epoll.
// @see https://github.com/ossrs/srs/issues/162
if (!srs_st_epoll_is_supported()) {
return srs_error_new(ERROR_ST_SET_EPOLL, "linux epoll disabled");
}
#endif
// Select the best event system available on the OS. In Linux this is
// epoll(). On BSD it will be kqueue.
if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name());
}
int r0 = 0;
if((r0 = st_init()) != 0){
return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);
}
srs_trace("st_init success, use %s", st_get_eventsys_name());
return srs_success;
}
void srs_close_stfd(srs_netfd_t& stfd)
{
if (stfd) {
// we must ensure the close is ok.
int err = st_netfd_close((st_netfd_t)stfd);
srs_assert(err != -1);
stfd = NULL;
}
}
srs_error_t srs_fd_closeexec(int fd)
{
int flags = fcntl(fd, F_GETFD);
flags |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, flags) == -1) {
return srs_error_new(ERROR_SOCKET_SETCLOSEEXEC, "FD_CLOEXEC fd=%d", fd);
}
return srs_success;
}
srs_error_t srs_fd_reuseaddr(int fd)
{
int v = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)) == -1) {
return srs_error_new(ERROR_SOCKET_SETREUSEADDR, "SO_REUSEADDR fd=%d", fd);
}
return srs_success;
}
srs_error_t srs_fd_reuseport(int fd)
{
#if defined(SO_REUSEPORT)
int v = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &v, sizeof(int)) == -1) {
#ifdef SRS_CROSSBUILD
srs_warn("SO_REUSEPORT disabled for crossbuild");
return srs_success;
#else
return srs_error_new(ERROR_SOCKET_SETREUSEADDR, "SO_REUSEPORT fd=%d", fd);
#endif
}
#else
#warning "SO_REUSEPORT is not supported by your OS"
srs_warn("SO_REUSEPORT is not supported util Linux kernel 3.9");
#endif
return srs_success;
}
srs_error_t srs_fd_keepalive(int fd)
{
#ifdef SO_KEEPALIVE
int v = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &v, sizeof(int)) == -1) {
return srs_error_new(ERROR_SOCKET_SETKEEPALIVE, "SO_KEEPALIVE fd=%d", fd);
}
#endif
return srs_success;
}
srs_thread_t srs_thread_self()
{
return (srs_thread_t)st_thread_self();
}
srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_UTIME_NO_TIMEOUT) {
timeout = tm;
}
*pstfd = NULL;
srs_netfd_t stfd = NULL;
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(server.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");
}
int sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
if(sock == -1){
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
::close(sock);
return srs_error_new(ERROR_ST_OPEN_SOCKET, "open socket");
}
if (st_connect((st_netfd_t)stfd, r->ai_addr, r->ai_addrlen, timeout) == -1){
srs_close_stfd(stfd);
return srs_error_new(ERROR_ST_CONNECT, "connect to %s:%d", server.c_str(), port);
}
*pstfd = stfd;
return srs_success;
}
srs_error_t do_srs_tcp_listen(int fd, addrinfo* r, srs_netfd_t* pfd)
{
srs_error_t err = srs_success;
// Detect alive for TCP connection.
// @see https://github.com/ossrs/srs/issues/1044
if ((err = srs_fd_keepalive(fd)) != srs_success) {
return srs_error_wrap(err, "set keepalive");
}
if ((err = srs_fd_closeexec(fd)) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
return srs_error_wrap(err, "set reuseaddr");
}
if ((err = srs_fd_reuseport(fd)) != srs_success) {
return srs_error_wrap(err, "set reuseport");
}
if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
return srs_error_new(ERROR_SOCKET_BIND, "bind");
}
if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {
return srs_error_new(ERROR_SOCKET_LISTEN, "listen");
}
if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open");
}
return err;
}
srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd)
{
srs_error_t err = srs_success;
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICHOST;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
hints.ai_family, hints.ai_socktype, hints.ai_flags);
}
int fd = 0;
if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
r->ai_family, r->ai_socktype, r->ai_protocol);
}
if ((err = do_srs_tcp_listen(fd, r, pfd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "fd=%d", fd);
}
return err;
}
srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd)
{
srs_error_t err = srs_success;
if ((err = srs_fd_closeexec(fd)) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
return srs_error_wrap(err, "set reuseaddr");
}
if ((err = srs_fd_reuseport(fd)) != srs_success) {
return srs_error_wrap(err, "set reuseport");
}
if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
return srs_error_new(ERROR_SOCKET_BIND, "bind");
}
if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open");
}
return err;
}
srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd)
{
srs_error_t err = srs_success;
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_NUMERICHOST;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
hints.ai_family, hints.ai_socktype, hints.ai_flags);
}
int fd = 0;
if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
r->ai_family, r->ai_socktype, r->ai_protocol);
}
if ((err = do_srs_udp_listen(fd, r, pfd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "fd=%d", fd);
}
return err;
}
srs_cond_t srs_cond_new()
{
return (srs_cond_t)st_cond_new();
}
int srs_cond_destroy(srs_cond_t cond)
{
return st_cond_destroy((st_cond_t)cond);
}
int srs_cond_wait(srs_cond_t cond)
{
return st_cond_wait((st_cond_t)cond);
}
int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout)
{
return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout);
}
int srs_cond_signal(srs_cond_t cond)
{
return st_cond_signal((st_cond_t)cond);
}
srs_mutex_t srs_mutex_new()
{
return (srs_mutex_t)st_mutex_new();
}
int srs_mutex_destroy(srs_mutex_t mutex)
{
if (!mutex) {
return 0;
}
return st_mutex_destroy((st_mutex_t)mutex);
}
int srs_mutex_lock(srs_mutex_t mutex)
{
return st_mutex_lock((st_mutex_t)mutex);
}
int srs_mutex_unlock(srs_mutex_t mutex)
{
return st_mutex_unlock((st_mutex_t)mutex);
}
int srs_netfd_fileno(srs_netfd_t stfd)
{
return st_netfd_fileno((st_netfd_t)stfd);
}
int srs_usleep(srs_utime_t usecs)
{
return st_usleep((st_utime_t)usecs);
}
srs_netfd_t srs_netfd_open_socket(int osfd)
{
return (srs_netfd_t)st_netfd_open_socket(osfd);
}
srs_netfd_t srs_netfd_open(int osfd)
{
return (srs_netfd_t)st_netfd_open(osfd);
}
int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout)
{
return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout);
}
int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to, int tolen, srs_utime_t timeout)
{
return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout);
}
int srs_recvmsg(srs_netfd_t stfd, struct msghdr *msg, int flags, srs_utime_t timeout)
{
return st_recvmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}
int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout)
{
return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}
int srs_sendmmsg(srs_netfd_t stfd, struct srs_mmsghdr *msgvec, unsigned int vlen, int flags, srs_utime_t timeout)
{
return st_sendmmsg((st_netfd_t)stfd, (struct st_mmsghdr*)msgvec, vlen, flags, (st_utime_t)timeout);
}
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{
return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);
}
ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
{
return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
}
bool srs_is_never_timeout(srs_utime_t tm)
{
return tm == SRS_UTIME_NO_TIMEOUT;
}
SrsStSocket::SrsStSocket()
{
stfd = NULL;
stm = rtm = SRS_UTIME_NO_TIMEOUT;
rbytes = sbytes = 0;
}
SrsStSocket::~SrsStSocket()
{
}
srs_error_t SrsStSocket::initialize(srs_netfd_t fd)
{
stfd = fd;
return srs_success;
}
void SrsStSocket::set_recv_timeout(srs_utime_t tm)
{
rtm = tm;
}
srs_utime_t SrsStSocket::get_recv_timeout()
{
return rtm;
}
void SrsStSocket::set_send_timeout(srs_utime_t tm)
{
stm = tm;
}
srs_utime_t SrsStSocket::get_send_timeout()
{
return stm;
}
int64_t SrsStSocket::get_recv_bytes()
{
return rbytes;
}
int64_t SrsStSocket::get_send_bytes()
{
return sbytes;
}
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
srs_error_t err = srs_success;
ssize_t nb_read;
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value of 0 means the network connection is closed or end of file is reached).
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));
}
if (nb_read == 0) {
errno = ECONNRESET;
}
return srs_error_new(ERROR_SOCKET_READ, "read");
}
rbytes += nb_read;
return err;
}
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
srs_error_t err = srs_success;
ssize_t nb_read;
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value less than nbyte means the network connection is closed or end of file is reached)
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read != (ssize_t)size) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));
}
if (nb_read >= 0) {
errno = ECONNRESET;
}
return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
}
rbytes += nb_read;
return err;
}
srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
ssize_t nb_write;
if (stm == SRS_UTIME_NO_TIMEOUT) {
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write((st_netfd_t)stfd, buf, size, stm);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", srsu2msi(stm));
}
return srs_error_new(ERROR_SOCKET_WRITE, "write");
}
sbytes += nb_write;
return err;
}
srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
ssize_t nb_write;
if (stm == SRS_UTIME_NO_TIMEOUT) {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", srsu2msi(stm));
}
return srs_error_new(ERROR_SOCKET_WRITE, "writev");
}
sbytes += nb_write;
return err;
}
SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm)
{
stfd = NULL;
io = new SrsStSocket();
host = h;
port = p;
timeout = tm;
}
SrsTcpClient::~SrsTcpClient()
{
close();
srs_freep(io);
}
srs_error_t SrsTcpClient::connect()
{
srs_error_t err = srs_success;
close();
srs_assert(stfd == NULL);
if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
}
if ((err = io->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: init socket object");
}
return err;
}
void SrsTcpClient::close()
{
// Ignore when already closed.
if (!io) {
return;
}
srs_close_stfd(stfd);
}
void SrsTcpClient::set_recv_timeout(srs_utime_t tm)
{
io->set_recv_timeout(tm);
}
srs_utime_t SrsTcpClient::get_recv_timeout()
{
return io->get_recv_timeout();
}
void SrsTcpClient::set_send_timeout(srs_utime_t tm)
{
io->set_send_timeout(tm);
}
srs_utime_t SrsTcpClient::get_send_timeout()
{
return io->get_send_timeout();
}
int64_t SrsTcpClient::get_recv_bytes()
{
return io->get_recv_bytes();
}
int64_t SrsTcpClient::get_send_bytes()
{
return io->get_send_bytes();
}
srs_error_t SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
{
return io->read(buf, size, nread);
}
srs_error_t SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
{
return io->read_fully(buf, size, nread);
}
srs_error_t SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
{
return io->write(buf, size, nwrite);
}
srs_error_t SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
return io->writev(iov, iov_size, nwrite);
}

View file

@ -0,0 +1,213 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_ST_HPP
#define SRS_SERVICE_ST_HPP
#include <srs_core.hpp>
#include <string>
#include <srs_protocol_io.hpp>
// Wrap for coroutine.
typedef void* srs_netfd_t;
typedef void* srs_thread_t;
typedef void* srs_cond_t;
typedef void* srs_mutex_t;
// Initialize st, requires epoll.
extern srs_error_t srs_st_init();
// Close the netfd, and close the underlayer fd.
// @remark when close, user must ensure io completed.
extern void srs_close_stfd(srs_netfd_t& stfd);
// Set the FD_CLOEXEC of FD.
extern srs_error_t srs_fd_closeexec(int fd);
// Set the SO_REUSEADDR of fd.
extern srs_error_t srs_fd_reuseaddr(int fd);
// Set the SO_REUSEPORT of fd.
extern srs_error_t srs_fd_reuseport(int fd);
// Set the SO_KEEPALIVE of fd.
extern srs_error_t srs_fd_keepalive(int fd);
// Get current coroutine/thread.
extern srs_thread_t srs_thread_self();
// For client, to open socket and connect to server.
// @param tm The timeout in srs_utime_t.
extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);
// For server, listen at TCP endpoint.
extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);
// For server, listen at UDP endpoint.
extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);
// Wrap for coroutine.
extern srs_cond_t srs_cond_new();
extern int srs_cond_destroy(srs_cond_t cond);
extern int srs_cond_wait(srs_cond_t cond);
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
extern int srs_cond_signal(srs_cond_t cond);
extern srs_mutex_t srs_mutex_new();
extern int srs_mutex_destroy(srs_mutex_t mutex);
extern int srs_mutex_lock(srs_mutex_t mutex);
extern int srs_mutex_unlock(srs_mutex_t mutex);
extern int srs_netfd_fileno(srs_netfd_t stfd);
extern int srs_usleep(srs_utime_t usecs);
extern srs_netfd_t srs_netfd_open_socket(int osfd);
extern srs_netfd_t srs_netfd_open(int osfd);
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout);
extern int srs_recvmsg(srs_netfd_t stfd, struct msghdr *msg, int flags, srs_utime_t timeout);
extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout);
// @see http://man7.org/linux/man-pages/man2/sendmmsg.2.html
#include <sys/socket.h>
struct srs_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
extern int srs_sendmmsg(srs_netfd_t stfd, struct srs_mmsghdr *msgvec, unsigned int vlen, int flags, srs_utime_t timeout);
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);
extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);
extern bool srs_is_never_timeout(srs_utime_t tm);
// The mutex locker.
#define SrsLocker(instance) \
impl__SrsLocker _SRS_free_##instance(&instance)
class impl__SrsLocker
{
private:
srs_mutex_t* lock;
public:
impl__SrsLocker(srs_mutex_t* l) {
lock = l;
int r0 = srs_mutex_lock(*lock);
srs_assert(!r0);
}
virtual ~impl__SrsLocker() {
int r0 = srs_mutex_unlock(*lock);
srs_assert(!r0);
}
};
// the socket provides TCP socket over st,
// that is, the sync socket mechanism.
class SrsStSocket : public ISrsProtocolReadWriter
{
private:
// The recv/send timeout in srs_utime_t.
// @remark Use SRS_UTIME_NO_TIMEOUT for never timeout.
srs_utime_t rtm;
srs_utime_t stm;
// The recv/send data in bytes
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
srs_netfd_t stfd;
public:
SrsStSocket();
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
virtual srs_error_t initialize(srs_netfd_t fd);
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
// @param nread, the actual read bytes, ignore if NULL.
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
// @param nwrite, the actual write bytes, ignore if NULL.
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// The client to connect to server over TCP.
// User must never reuse the client when close it.
// Usage:
// SrsTcpClient client("127.0.0.1", 1935, 9 * SRS_UTIME_SECONDS);
// client.connect();
// client.write("Hello world!", 12, NULL);
// client.read(buf, 4096, NULL);
// @remark User can directly free the object, which will close the fd.
class SrsTcpClient : public ISrsProtocolReadWriter
{
private:
srs_netfd_t stfd;
SrsStSocket* io;
private:
std::string host;
int port;
// The timeout in srs_utime_t.
srs_utime_t timeout;
public:
// Constructor.
// @param h the ip or hostname of server.
// @param p the port to connect to.
// @param tm the timeout in srs_utime_t.
SrsTcpClient(std::string h, int p, srs_utime_t tm);
virtual ~SrsTcpClient();
public:
// Connect to server over TCP.
// @remark We will close the exists connection before do connect.
virtual srs_error_t connect();
private:
// Close the connection to server.
// @remark User should never use the client when close it.
virtual void close();
// Interface ISrsProtocolReadWriter
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
#endif

View file

@ -0,0 +1,384 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_utility.hpp>
#include <unistd.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <math.h>
#include <stdlib.h>
#include <map>
#include <sstream>
using namespace std;
#include <srs_service_st.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_http_stack.hpp>
bool srs_string_is_http(string url)
{
return srs_string_starts_with(url, "http://", "https://");
}
bool srs_string_is_rtmp(string url)
{
return srs_string_starts_with(url, "rtmp://");
}
bool srs_is_digit_number(string str)
{
if (str.empty()) {
return false;
}
const char* p = str.c_str();
const char* p_end = str.data() + str.length();
for (; p < p_end; p++) {
if (*p != '0') {
break;
}
}
if (p == p_end) {
return true;
}
int64_t v = ::atoll(p);
int64_t powv = (int64_t)pow(10, p_end - p - 1);
return v / powv >= 1 && v / powv <= 9;
}
// we detect all network device as internet or intranet device, by its ip address.
// key is device name, for instance, eth0
// value is whether internet, for instance, true.
static std::map<std::string, bool> _srs_device_ifs;
bool srs_net_device_is_internet(string ifname)
{
srs_info("check ifname=%s", ifname.c_str());
if (_srs_device_ifs.find(ifname) == _srs_device_ifs.end()) {
return false;
}
return _srs_device_ifs[ifname];
}
bool srs_net_device_is_internet(const sockaddr* addr)
{
if(addr->sa_family == AF_INET) {
const in_addr inaddr = ((sockaddr_in*)addr)->sin_addr;
const uint32_t addr_h = ntohl(inaddr.s_addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
return false;
}
// Class A 10.0.0.0-10.255.255.255
if (addr_h >= 0x0a000000 && addr_h <= 0x0affffff) {
return false;
}
// Class B 172.16.0.0-172.31.255.255
if (addr_h >= 0xac100000 && addr_h <= 0xac1fffff) {
return false;
}
// Class C 192.168.0.0-192.168.255.255
if (addr_h >= 0xc0a80000 && addr_h <= 0xc0a8ffff) {
return false;
}
} else if(addr->sa_family == AF_INET6) {
const sockaddr_in6* a6 = (const sockaddr_in6*)addr;
// IPv6 loopback is ::1
if (IN6_IS_ADDR_LOOPBACK(&a6->sin6_addr)) {
return false;
}
// IPv6 unspecified is ::
if (IN6_IS_ADDR_UNSPECIFIED(&a6->sin6_addr)) {
return false;
}
// From IPv4, you might know APIPA (Automatic Private IP Addressing) or AutoNet.
// Whenever automatic IP configuration through DHCP fails.
// The prefix of a site-local address is FE80::/10.
if (IN6_IS_ADDR_LINKLOCAL(&a6->sin6_addr)) {
return false;
}
// Site-local addresses are equivalent to private IP addresses in IPv4.
// The prefix of a site-local address is FEC0::/10.
// https://4sysops.com/archives/ipv6-tutorial-part-6-site-local-addresses-and-link-local-addresses/
if (IN6_IS_ADDR_SITELOCAL(&a6->sin6_addr)) {
return false;
}
// Others.
if (IN6_IS_ADDR_MULTICAST(&a6->sin6_addr)) {
return false;
}
if (IN6_IS_ADDR_MC_NODELOCAL(&a6->sin6_addr)) {
return false;
}
if (IN6_IS_ADDR_MC_LINKLOCAL(&a6->sin6_addr)) {
return false;
}
if (IN6_IS_ADDR_MC_SITELOCAL(&a6->sin6_addr)) {
return false;
}
if (IN6_IS_ADDR_MC_ORGLOCAL(&a6->sin6_addr)) {
return false;
}
if (IN6_IS_ADDR_MC_GLOBAL(&a6->sin6_addr)) {
return false;
}
}
return true;
}
vector<SrsIPAddress*> _srs_system_ips;
void discover_network_iface(ifaddrs* cur, vector<SrsIPAddress*>& ips, stringstream& ss0, stringstream& ss1, bool ipv6, bool loopback)
{
char saddr[64];
char* h = (char*)saddr;
socklen_t nbh = (socklen_t)sizeof(saddr);
const int r0 = getnameinfo(cur->ifa_addr, sizeof(sockaddr_storage), h, nbh, NULL, 0, NI_NUMERICHOST);
if(r0) {
srs_warn("convert local ip failed: %s", gai_strerror(r0));
return;
}
std::string ip(saddr, strlen(saddr));
ss0 << ", iface[" << (int)ips.size() << "] " << cur->ifa_name << " " << (ipv6? "ipv6":"ipv4")
<< " 0x" << std::hex << cur->ifa_flags << std::dec << " " << ip;
SrsIPAddress* ip_address = new SrsIPAddress();
ip_address->ip = ip;
ip_address->is_ipv4 = !ipv6;
ip_address->is_loopback = loopback;
ip_address->ifname = cur->ifa_name;
ip_address->is_internet = srs_net_device_is_internet(cur->ifa_addr);
ips.push_back(ip_address);
// set the device internet status.
if (!ip_address->is_internet) {
ss1 << ", intranet ";
_srs_device_ifs[cur->ifa_name] = false;
} else {
ss1 << ", internet ";
_srs_device_ifs[cur->ifa_name] = true;
}
ss1 << cur->ifa_name << " " << ip;
}
void retrieve_local_ips()
{
vector<SrsIPAddress*>& ips = _srs_system_ips;
// Release previous IPs.
for (int i = 0; i < (int)ips.size(); i++) {
SrsIPAddress* ip = ips[i];
srs_freep(ip);
}
ips.clear();
// Get the addresses.
ifaddrs* ifap;
if (getifaddrs(&ifap) == -1) {
srs_warn("retrieve local ips, getifaddrs failed.");
return;
}
stringstream ss0;
ss0 << "ips";
stringstream ss1;
ss1 << "devices";
// Discover IPv4 first.
for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p;
// Ignore if no address for this interface.
// @see https://github.com/ossrs/srs/issues/1087#issuecomment-408847115
if (!cur->ifa_addr) {
continue;
}
// retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
bool ipv4 = (cur->ifa_addr->sa_family == AF_INET);
bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING);
// Ignore IFF_PROMISC(Interface is in promiscuous mode), which may be set by Wireshark.
bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_LOOPBACK) || (cur->ifa_flags & IFF_POINTOPOINT);
bool loopback = (cur->ifa_flags & IFF_LOOPBACK);
if (ipv4 && ready && !ignored) {
discover_network_iface(cur, ips, ss0, ss1, false, loopback);
}
}
// Then, discover IPv6 addresses.
for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p;
// Ignore if no address for this interface.
// @see https://github.com/ossrs/srs/issues/1087#issuecomment-408847115
if (!cur->ifa_addr) {
continue;
}
// retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
bool ipv6 = (cur->ifa_addr->sa_family == AF_INET6);
bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING);
bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_POINTOPOINT) || (cur->ifa_flags & IFF_PROMISC) || (cur->ifa_flags & IFF_LOOPBACK);
bool loopback = (cur->ifa_flags & IFF_LOOPBACK);
if (ipv6 && ready && !ignored) {
discover_network_iface(cur, ips, ss0, ss1, true, loopback);
}
}
// If empty, disover IPv4 loopback.
if (ips.empty()) {
for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p;
// Ignore if no address for this interface.
// @see https://github.com/ossrs/srs/issues/1087#issuecomment-408847115
if (!cur->ifa_addr) {
continue;
}
// retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
bool ipv4 = (cur->ifa_addr->sa_family == AF_INET);
bool ready = (cur->ifa_flags & IFF_UP) && (cur->ifa_flags & IFF_RUNNING);
bool ignored = (!cur->ifa_addr) || (cur->ifa_flags & IFF_POINTOPOINT) || (cur->ifa_flags & IFF_PROMISC);
bool loopback = (cur->ifa_flags & IFF_LOOPBACK);
if (ipv4 && ready && !ignored) {
discover_network_iface(cur, ips, ss0, ss1, false, loopback);
}
}
}
srs_trace(ss0.str().c_str());
srs_trace(ss1.str().c_str());
freeifaddrs(ifap);
}
vector<SrsIPAddress*>& srs_get_local_ips()
{
if (_srs_system_ips.empty()) {
retrieve_local_ips();
}
return _srs_system_ips;
}
std::string _public_internet_address;
string srs_get_public_internet_address(bool ipv4_only)
{
if (!_public_internet_address.empty()) {
return _public_internet_address;
}
std::vector<SrsIPAddress*>& ips = srs_get_local_ips();
// find the best match public address.
for (int i = 0; i < (int)ips.size(); i++) {
SrsIPAddress* ip = ips[i];
if (!ip->is_internet) {
continue;
}
if (ipv4_only && !ip->is_ipv4) {
continue;
}
srs_warn("use public address as ip: %s, ifname=%s", ip->ip.c_str(), ip->ifname.c_str());
_public_internet_address = ip->ip;
return ip->ip;
}
// no public address, use private address.
for (int i = 0; i < (int)ips.size(); i++) {
SrsIPAddress* ip = ips[i];
if (ip->is_loopback) {
continue;
}
if (ipv4_only && !ip->is_ipv4) {
continue;
}
srs_warn("use private address as ip: %s, ifname=%s", ip->ip.c_str(), ip->ifname.c_str());
_public_internet_address = ip->ip;
return ip->ip;
}
// Finally, use first whatever kind of address.
if (!ips.empty() && _public_internet_address.empty()) {
SrsIPAddress* ip = ips[0];
srs_warn("use first address as ip: %s, ifname=%s", ip->ip.c_str(), ip->ifname.c_str());
_public_internet_address = ip->ip;
return ip->ip;
}
return "";
}
string srs_get_original_ip(ISrsHttpMessage* r)
{
SrsHttpHeader* h = r->header();
string x_forwarded_for = h->get("X-Forwarded-For");
if (!x_forwarded_for.empty()) {
size_t pos = string::npos;
if ((pos = x_forwarded_for.find(",")) == string::npos) {
return x_forwarded_for;
}
return x_forwarded_for.substr(0, pos);
}
string x_real_ip = h->get("X-Real-IP");
if (!x_real_ip.empty()) {
size_t pos = string::npos;
if ((pos = x_real_ip.find(":")) == string::npos) {
return x_real_ip;
}
return x_real_ip.substr(0, pos);
}
return "";
}

View file

@ -0,0 +1,80 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_UTILITY_HPP
#define SRS_SERVICE_UTILITY_HPP
#include <srs_core.hpp>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <srs_service_st.hpp>
class ISrsHttpMessage;
// Whether the url is starts with http:// or https://
extern bool srs_string_is_http(std::string url);
extern bool srs_string_is_rtmp(std::string url);
// Whether string is digit number
// is_digit("0") === true
// is_digit("0000000000") === true
// is_digit("1234567890") === true
// is_digit("0123456789") === true
// is_digit("1234567890a") === false
// is_digit("a1234567890") === false
// is_digit("10e3") === false
// is_digit("!1234567890") === false
// is_digit("") === false
extern bool srs_is_digit_number(std::string str);
// Get local ip, fill to @param ips
struct SrsIPAddress
{
// The network interface name, such as eth0, en0, eth1.
std::string ifname;
// The IP v4 or v6 address.
std::string ip;
// Whether the ip is IPv4 address.
bool is_ipv4;
// Whether the ip is internet public IP address.
bool is_internet;
// Whether the ip is loopback, such as 127.0.0.1
bool is_loopback;
};
extern std::vector<SrsIPAddress*>& srs_get_local_ips();
// Get local public ip, empty string if no public internet address found.
extern std::string srs_get_public_internet_address(bool ipv4_only = false);
// Detect whether specified device is internet public address.
extern bool srs_net_device_is_internet(std::string ifname);
extern bool srs_net_device_is_internet(const sockaddr* addr);
// Get the original ip from query and header by proxy.
extern std::string srs_get_original_ip(ISrsHttpMessage* r);
#endif