1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-12 11:21:52 +00:00

For #820, modules use service only.

This commit is contained in:
winlin 2017-03-26 13:40:39 +08:00
parent ff822b55cd
commit e3526c0cf6
34 changed files with 3061 additions and 2620 deletions

View file

@ -41,7 +41,7 @@ echo -n "${INCS_NAME} = -I${MODULE_DIR} " >> ${FILE}
for item in ${MODULE_DEPENDS[*]}; do
DEP_INCS_NAME="${item}_INCS"do
DEP_INCS_NAME="${item}_MODULE_INCS"
echo -n "\$(${DEP_INCS_NAME}) " >> ${FILE}
echo -n "\$(${DEP_INCS_NAME})" >> ${FILE}
done
#
# depends library header files
@ -79,7 +79,8 @@ for item in ${MODULE_FILES[*]}; do
MODULE_OBJS="${MODULE_OBJS[@]} ${CPP_FILE}"
if [ -f ${CPP_FILE} ]; then
echo "${OBJ_FILE}: \$(${DEPS_NAME}) ${CPP_FILE} " >> ${FILE}
echo " \$(CXX) -c \$(CXXFLAGS) ${DEFINES} \$(${INCS_NAME})\\" >> ${FILE}
echo " \$(CXX) -c \$(CXXFLAGS) ${DEFINES}\\" >> ${FILE}
echo " \$(${INCS_NAME})\\" >> ${FILE}
echo " -o ${OBJ_FILE} ${CPP_FILE}" >> ${FILE}
fi
done

40
trunk/configure vendored
View file

@ -177,8 +177,10 @@ PROTOCOL_OBJS="${MODULE_OBJS[@]}"
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
MODULE_ID="SERVICE"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL")
ModuleLibIncs=(${LibSTRoot} ${LibSSLRoot} ${SRS_OBJS_DIR})
MODULE_FILES=("srs_service_log" "srs_service_st")
ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR})
MODULE_FILES=("srs_service_log" "srs_service_st" "srs_service_http_client"
"srs_service_http_conn" "srs_service_rtmp_conn" "srs_service_utility"
"srs_service_conn")
DEFINES=""
SERVICE_INCS="src/service"; MODULE_DIR=${SERVICE_INCS} . auto/modules.sh
SERVICE_OBJS="${MODULE_OBJS[@]}"
@ -188,7 +190,7 @@ fi
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
MODULE_ID="APP"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE")
ModuleLibIncs=(${LibSTRoot} ${LibSSLRoot} ${SRS_OBJS_DIR})
ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR})
MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_source"
"srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http_stream"
"srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config"
@ -218,12 +220,22 @@ MODULE_FILES=("srs_librtmp" "srs_lib_simple_socket" "srs_lib_bandwidth")
LIBS_INCS="src/libs"; MODULE_DIR=${LIBS_INCS} . auto/modules.sh
LIBS_OBJS="${MODULE_OBJS[@]}"
#
#Main Module
#Main Module, for SRS.
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
MODULE_ID="MAIN"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP")
ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot})
ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot})
MODULE_FILES=("srs_main_server")
MAIN_INCS="src/main"; MODULE_DIR=${MAIN_INCS} . auto/modules.sh
MAIN_OBJS="${MODULE_OBJS[@]}"
fi
#
#Main Module, for app from modules.
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
MODULE_ID="MAIN2"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE")
ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot})
MODULE_FILES=()
DEFINES=""
# add each modules for main
for SRS_MODULE in ${SRS_MODULES[*]}; do
@ -231,24 +243,23 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
MODULE_FILES+=("${SRS_MODULE_MAIN[*]}")
DEFINES="${DEFINES} ${SRS_MODULE_DEFINES}"
done
MAIN_INCS="src/main"; MODULE_DIR=${MAIN_INCS} . auto/modules.sh
MAIN_OBJS="${MODULE_OBJS[@]}"
MAIN2_INCS="src/main"; MODULE_DIR=${MAIN2_INCS} . auto/modules.sh
MAIN2_OBJS="${MODULE_OBJS[@]}"
fi
#####################################################################################
# Binaries, main entrances, link the module and its depends modules,
# then link to a binary, for example, objs/srs
#
# disable all app when export librtmp
# Disable SRS application for exporting librtmp.
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
# all main entrances
MAIN_ENTRANCES=("srs_main_server")
# add each modules for main
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_MODULE/config
MAIN_ENTRANCES+=("${SRS_MODULE_MAIN[*]}")
done
#
#
# all depends libraries
ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile})
# all depends objects
@ -257,12 +268,15 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
#
# srs: srs(simple rtmp server) over st(state-threads)
BUILD_KEY="srs" APP_MAIN="srs_main_server" APP_NAME="srs" . auto/apps.sh
# add each modules for application
#
# For modules, without the app module.
MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${MAIN2_OBJS[@]}"
#
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_MODULE/config
# no SRS_MODULE_MAIN
# no SRS_MODULE_MAIN
if [[ 0 -eq ${#SRS_MODULE_MAIN[@]} ]]; then continue; fi
BUILD_KEY="$SRS_MODULE_NAME" APP_MAIN="$SRS_MODULE_MAIN" APP_NAME="$SRS_MODULE_NAME" . auto/apps.sh
BUILD_KEY="$SRS_MODULE_NAME" APP_MAIN="${SRS_MODULE_MAIN[0]}" APP_NAME="$SRS_MODULE_NAME" . auto/apps.sh
done
fi
# srs librtmp

View file

@ -127,6 +127,11 @@
3CE893B51E87508D000B742D /* srs_app_dvr.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893B31E87508D000B742D /* srs_app_dvr.cpp */; };
3CE893B91E8750A9000B742D /* srs_service_log.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893B71E8750A9000B742D /* srs_service_log.cpp */; };
3CE893BC1E875108000B742D /* srs_service_st.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893BA1E875108000B742D /* srs_service_st.cpp */; };
3CE893BF1E876A97000B742D /* srs_service_http_client.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893BD1E876A97000B742D /* srs_service_http_client.cpp */; };
3CE893C21E876B9E000B742D /* srs_service_http_conn.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893C01E876B9E000B742D /* srs_service_http_conn.cpp */; };
3CE893C51E876C39000B742D /* srs_service_rtmp_conn.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893C31E876C39000B742D /* srs_service_rtmp_conn.cpp */; };
3CE893C81E876D04000B742D /* srs_service_utility.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893C61E876D04000B742D /* srs_service_utility.cpp */; };
3CE893CB1E8770E2000B742D /* srs_service_conn.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE893C91E8770E2000B742D /* srs_service_conn.cpp */; };
/* End PBXBuildFile section */
/* Begin PBXCopyFilesBuildPhase section */
@ -431,6 +436,16 @@
3CE893B81E8750A9000B742D /* srs_service_log.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_log.hpp; path = ../../../src/service/srs_service_log.hpp; sourceTree = "<group>"; };
3CE893BA1E875108000B742D /* srs_service_st.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_service_st.cpp; path = ../../../src/service/srs_service_st.cpp; sourceTree = "<group>"; };
3CE893BB1E875108000B742D /* srs_service_st.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_st.hpp; path = ../../../src/service/srs_service_st.hpp; sourceTree = "<group>"; };
3CE893BD1E876A97000B742D /* srs_service_http_client.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_service_http_client.cpp; path = ../../../src/service/srs_service_http_client.cpp; sourceTree = "<group>"; };
3CE893BE1E876A97000B742D /* srs_service_http_client.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_http_client.hpp; path = ../../../src/service/srs_service_http_client.hpp; sourceTree = "<group>"; };
3CE893C01E876B9E000B742D /* srs_service_http_conn.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_service_http_conn.cpp; path = ../../../src/service/srs_service_http_conn.cpp; sourceTree = "<group>"; };
3CE893C11E876B9E000B742D /* srs_service_http_conn.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_http_conn.hpp; path = ../../../src/service/srs_service_http_conn.hpp; sourceTree = "<group>"; };
3CE893C31E876C39000B742D /* srs_service_rtmp_conn.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_service_rtmp_conn.cpp; path = ../../../src/service/srs_service_rtmp_conn.cpp; sourceTree = "<group>"; };
3CE893C41E876C39000B742D /* srs_service_rtmp_conn.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_rtmp_conn.hpp; path = ../../../src/service/srs_service_rtmp_conn.hpp; sourceTree = "<group>"; };
3CE893C61E876D04000B742D /* srs_service_utility.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_service_utility.cpp; path = ../../../src/service/srs_service_utility.cpp; sourceTree = "<group>"; };
3CE893C71E876D04000B742D /* srs_service_utility.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_utility.hpp; path = ../../../src/service/srs_service_utility.hpp; sourceTree = "<group>"; };
3CE893C91E8770E2000B742D /* srs_service_conn.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_service_conn.cpp; path = ../../../src/service/srs_service_conn.cpp; sourceTree = "<group>"; };
3CE893CA1E8770E2000B742D /* srs_service_conn.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_service_conn.hpp; path = ../../../src/service/srs_service_conn.hpp; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
@ -475,9 +490,9 @@
3C1231EF1AAE651100CE8F6C /* core */,
3C1232071AAE814200CE8F6C /* kernel */,
3C12322C1AAE819900CE8F6C /* protocol */,
3CE893B61E875095000B742D /* service */,
3C12324B1AAE81CE00CE8F6C /* app */,
3C96ADC41B00A71000885304 /* modules */,
3CE893B61E875095000B742D /* service */,
3C1232041AAE80CB00CE8F6C /* main */,
3C36DB541ABD1CA70066CCAF /* libs */,
3C1231F91AAE670E00CE8F6C /* objs */,
@ -919,10 +934,20 @@
3CE893B61E875095000B742D /* service */ = {
isa = PBXGroup;
children = (
3CE893C91E8770E2000B742D /* srs_service_conn.cpp */,
3CE893CA1E8770E2000B742D /* srs_service_conn.hpp */,
3CE893BD1E876A97000B742D /* srs_service_http_client.cpp */,
3CE893BE1E876A97000B742D /* srs_service_http_client.hpp */,
3CE893C01E876B9E000B742D /* srs_service_http_conn.cpp */,
3CE893C11E876B9E000B742D /* srs_service_http_conn.hpp */,
3CE893B71E8750A9000B742D /* srs_service_log.cpp */,
3CE893B81E8750A9000B742D /* srs_service_log.hpp */,
3CE893C31E876C39000B742D /* srs_service_rtmp_conn.cpp */,
3CE893C41E876C39000B742D /* srs_service_rtmp_conn.hpp */,
3CE893BA1E875108000B742D /* srs_service_st.cpp */,
3CE893BB1E875108000B742D /* srs_service_st.hpp */,
3CE893C61E876D04000B742D /* srs_service_utility.cpp */,
3CE893C71E876D04000B742D /* srs_service_utility.hpp */,
);
name = service;
sourceTree = "<group>";
@ -1008,6 +1033,7 @@
3C12322B1AAE814D00CE8F6C /* srs_kernel_utility.cpp in Sources */,
3CA5F1411E65543700E442C7 /* event.c in Sources */,
3C12324A1AAE81A400CE8F6C /* srs_rtsp_stack.cpp in Sources */,
3CE893BF1E876A97000B742D /* srs_service_http_client.cpp in Sources */,
3C36DB5D1ABD1CB90066CCAF /* srs_librtmp.cpp in Sources */,
3CA5F1421E65543700E442C7 /* io.c in Sources */,
3C12329F1AAE81D900CE8F6C /* srs_app_http_api.cpp in Sources */,
@ -1027,6 +1053,7 @@
3C1232221AAE814D00CE8F6C /* srs_kernel_codec.cpp in Sources */,
3C1232B71AAE81D900CE8F6C /* srs_app_utility.cpp in Sources */,
3C1232AB1AAE81D900CE8F6C /* srs_app_recv_thread.cpp in Sources */,
3CE893C51E876C39000B742D /* srs_service_rtmp_conn.cpp in Sources */,
3CC52DDC1ACE4023006FEB01 /* srs_utest_protocol.cpp in Sources */,
3C663F151AB0155100286D8B /* srs_h264_raw_publish.c in Sources */,
3C1231F61AAE652D00CE8F6C /* srs_core_autofree.cpp in Sources */,
@ -1048,9 +1075,11 @@
3C12329C1AAE81D900CE8F6C /* srs_app_forward.cpp in Sources */,
3C1232251AAE814D00CE8F6C /* srs_kernel_file.cpp in Sources */,
3C1232AD1AAE81D900CE8F6C /* srs_app_reload.cpp in Sources */,
3CE893C81E876D04000B742D /* srs_service_utility.cpp in Sources */,
3C1231F81AAE652D00CE8F6C /* srs_core.cpp in Sources */,
3C1232A21AAE81D900CE8F6C /* srs_app_http_hooks.cpp in Sources */,
3C663F121AB0155100286D8B /* srs_detect_rtmp.c in Sources */,
3CE893CB1E8770E2000B742D /* srs_service_conn.cpp in Sources */,
3C1232B11AAE81D900CE8F6C /* srs_app_server.cpp in Sources */,
3C1232061AAE812C00CE8F6C /* srs_main_server.cpp in Sources */,
3C1232281AAE814D00CE8F6C /* srs_kernel_mp3.cpp in Sources */,
@ -1076,6 +1105,7 @@
3C28EDDF1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp in Sources */,
3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */,
3C036B561B2D0AC10078E2E0 /* srs_app_http_stream.cpp in Sources */,
3CE893C21E876B9E000B742D /* srs_service_http_conn.cpp in Sources */,
3C068D6D1B10175500AA722C /* srs_protocol_stream.cpp in Sources */,
3CB25C2A1BB269FD00C97A63 /* jmp_sp.cpp in Sources */,
3C068D6D1B10175500AA722C /* srs_protocol_stream.cpp in Sources */,

View file

@ -1,4 +1,4 @@
SRS Module Ruler(SRS模块规则)
SRS Module Rules(SRS模块规则)
1. Each module in its seperate home directory(一个模块一个目录).
2. There is a config file in home(目录下放一个config文件).
3. All variables in configure are available(所有的configure中的变量模块中可以使用).

View file

@ -83,10 +83,12 @@ int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd)
return ret;
}
void SrsAppCasterFlv::remove(SrsConnection* c)
void SrsAppCasterFlv::remove(ISrsConnection* c)
{
SrsConnection* conn = dynamic_cast<SrsConnection*>(c);
std::vector<SrsHttpConn*>::iterator it;
if ((it = std::find(conns.begin(), conns.end(), c)) != conns.end()) {
if ((it = std::find(conns.begin(), conns.end(), conn)) != conns.end()) {
conns.erase(it);
}
}

View file

@ -68,7 +68,7 @@ public:
virtual int on_tcp_client(st_netfd_t stfd);
// IConnectionManager
public:
virtual void remove(SrsConnection* c);
virtual void remove(ISrsConnection* c);
// ISrsHttpHandler
public:
virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);

View file

@ -30,14 +30,6 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
IConnectionManager::IConnectionManager()
{
}
IConnectionManager::~IConnectionManager()
{
}
SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip)
{
id = 0;

View file

@ -32,30 +32,15 @@
#include <srs_app_thread.hpp>
#include <srs_protocol_kbps.hpp>
#include <srs_app_reload.hpp>
class SrsConnection;
/**
* the manager for connection.
*/
class IConnectionManager
{
public:
IConnectionManager();
virtual ~IConnectionManager();
public:
/**
* remove the specified connection.
*/
virtual void remove(SrsConnection* c) = 0;
};
#include <srs_service_conn.hpp>
/**
* the basic connection of SRS,
* all connections accept from listener must extends from this base class,
* server will add the connection to manager, and delete it when remove.
*/
class SrsConnection : virtual public ISrsOneCycleThreadHandler, virtual public IKbpsDelta, virtual public ISrsReloadHandler
class SrsConnection : virtual public ISrsConnection, virtual public ISrsOneCycleThreadHandler
, virtual public IKbpsDelta, virtual public ISrsReloadHandler
{
private:
/**

View file

@ -23,225 +23,3 @@
#include <srs_app_http_client.hpp>
#include <arpa/inet.h>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_st.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_protocol_kbps.hpp>
SrsHttpClient::SrsHttpClient()
{
transport = NULL;
kbps = new SrsKbps();
parser = NULL;
timeout = SRS_CONSTS_NO_TMMS;
port = 0;
}
SrsHttpClient::~SrsHttpClient()
{
disconnect();
srs_freep(kbps);
srs_freep(parser);
}
// TODO: FIXME: use ms for timeout.
int SrsHttpClient::initialize(string h, int p, int64_t tm)
{
int ret = ERROR_SUCCESS;
srs_freep(parser);
parser = new SrsHttpParser();
if ((ret = parser->initialize(HTTP_RESPONSE, false)) != ERROR_SUCCESS) {
srs_error("initialize parser failed. ret=%d", ret);
return ret;
}
// Always disconnect the transport.
host = h;
port = p;
timeout = tm;
disconnect();
// ep used for host in header.
string ep = host;
if (port > 0 && port != SRS_CONSTS_HTTP_DEFAULT_PORT) {
ep += ":" + srs_int2str(port);
}
// Set default value for headers.
headers["Host"] = ep;
headers["Connection"] = "Keep-Alive";
headers["User-Agent"] = RTMP_SIG_SRS_SERVER;
headers["Content-Type"] = "application/json";
return ret;
}
SrsHttpClient* SrsHttpClient::set_header(string k, string v)
{
headers[k] = v;
return this;
}
int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
{
*ppmsg = NULL;
int ret = ERROR_SUCCESS;
// always set the content length.
headers["Content-Length"] = srs_int2str(req.length());
if ((ret = connect()) != ERROR_SUCCESS) {
srs_warn("http connect server failed. ret=%d", ret);
return ret;
}
// send POST request to uri
// POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
std::stringstream ss;
ss << "POST " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF;
for (map<string, string>::iterator it = headers.begin(); it != headers.end(); ++it) {
string key = it->first;
string value = it->second;
ss << key << ": " << value << SRS_HTTP_CRLF;
}
ss << SRS_HTTP_CRLF << req;
std::string data = ss.str();
if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();
srs_error("write http post failed. ret=%d", ret);
return ret;
}
ISrsHttpMessage* msg = NULL;
if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) {
srs_error("parse http post response failed. ret=%d", ret);
return ret;
}
srs_assert(msg);
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
srs_info("parse http post response success.");
return ret;
}
int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
{
*ppmsg = NULL;
int ret = ERROR_SUCCESS;
// always set the content length.
headers["Content-Length"] = srs_int2str(req.length());
if ((ret = connect()) != ERROR_SUCCESS) {
srs_warn("http connect server failed. ret=%d", ret);
return ret;
}
// send POST request to uri
// GET %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
std::stringstream ss;
ss << "GET " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF;
for (map<string, string>::iterator it = headers.begin(); it != headers.end(); ++it) {
string key = it->first;
string value = it->second;
ss << key << ": " << value << SRS_HTTP_CRLF;
}
ss << SRS_HTTP_CRLF << req;
std::string data = ss.str();
if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();
srs_error("write http get failed. ret=%d", ret);
return ret;
}
ISrsHttpMessage* msg = NULL;
if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) {
srs_error("parse http post response failed. ret=%d", ret);
return ret;
}
srs_assert(msg);
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
srs_info("parse http get response success.");
return ret;
}
void SrsHttpClient::set_recv_timeout(int64_t tm)
{
transport->set_recv_timeout(tm);
}
void SrsHttpClient::kbps_sample(const char* label, int64_t age)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
void SrsHttpClient::disconnect()
{
kbps->set_io(NULL, NULL);
srs_freep(transport);
}
int SrsHttpClient::connect()
{
int ret = ERROR_SUCCESS;
// When transport connected, ignore.
if (transport) {
return ret;
}
transport = new SrsTcpClient(host, port, timeout);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
disconnect();
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
// Set the recv/send timeout in ms.
transport->set_recv_timeout(timeout);
transport->set_send_timeout(timeout);
kbps->set_io(transport, transport);
return ret;
}

View file

@ -26,84 +26,7 @@
#include <srs_core.hpp>
#include <string>
#include <map>
#include <srs_app_st.hpp>
class SrsHttpUri;
class SrsHttpParser;
class ISrsHttpMessage;
class SrsStSocket;
class SrsKbps;
// the default timeout for http client.
#define SRS_HTTP_CLIENT_TMMS (30*1000)
/**
* The client to GET/POST/PUT/DELETE over HTTP.
* @remark We will reuse the TCP transport until initialize or channel error,
* such as send/recv failed.
* Usage:
* SrsHttpClient hc;
* hc.initialize("127.0.0.1", 80, 9000);
* hc.post("/api/v1/version", "Hello world!", NULL);
*/
class SrsHttpClient
{
private:
// The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected.
// We will disconnect transport when initialize or channel error, such as send/recv error.
SrsTcpClient* transport;
SrsHttpParser* parser;
std::map<std::string, std::string> headers;
SrsKbps* kbps;
private:
// The timeout in ms.
int64_t timeout;
// The host name or ip.
std::string host;
int port;
public:
SrsHttpClient();
virtual ~SrsHttpClient();
public:
/**
* Initliaze the client, disconnect the transport, renew the HTTP parser.
* @param tm The underlayer TCP transport timeout in ms.
* @remark we will set default values in headers, which can be override by set_header.
*/
virtual int initialize(std::string h, int p, int64_t tm = SRS_HTTP_CLIENT_TMMS);
/**
* Set HTTP request header in header[k]=v.
* @return the HTTP client itself.
*/
virtual SrsHttpClient* set_header(std::string k, std::string v);
public:
/**
* to post data to the uri.
* @param the path to request on.
* @param req the data post to uri. empty string to ignore.
* @param ppmsg output the http message to read the response.
* @remark user must free the ppmsg if not NULL.
*/
virtual int post(std::string path, std::string req, ISrsHttpMessage** ppmsg);
/**
* to get data from the uri.
* @param the path to request on.
* @param req the data post to uri. empty string to ignore.
* @param ppmsg output the http message to read the response.
* @remark user must free the ppmsg if not NULL.
*/
virtual int get(std::string path, std::string req, ISrsHttpMessage** ppmsg);
private:
virtual void set_recv_timeout(int64_t tm);
public:
virtual void kbps_sample(const char* label, int64_t age);
private:
virtual void disconnect();
virtual int connect();
};
#include <srs_service_http_client.hpp>
#endif

File diff suppressed because it is too large Load diff

View file

@ -30,8 +30,7 @@
#include <string>
#include <vector>
#include <srs_app_st.hpp>
#include <srs_http_stack.hpp>
#include <srs_service_http_conn.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_thread.hpp>
@ -56,291 +55,6 @@ class SrsHttpMessage;
class SrsHttpStreamServer;
class SrsHttpStaticServer;
// the http chunked header size,
// for writev, there always one chunk to send it.
#define SRS_HTTP_HEADER_CACHE_SIZE 64
/**
* response writer use st socket
*/
class SrsHttpResponseWriter : public ISrsHttpResponseWriter
{
private:
SrsStSocket* skt;
SrsHttpHeader* hdr;
private:
char header_cache[SRS_HTTP_HEADER_CACHE_SIZE];
iovec* iovss_cache;
int nb_iovss_cache;
private:
// reply header has been (logically) written
bool header_wrote;
// status code passed to WriteHeader
int status;
private:
// explicitly-declared Content-Length; or -1
int64_t content_length;
// number of bytes written in body
int64_t written;
private:
// wroteHeader tells whether the header's been written to "the
// wire" (or rather: w.conn.buf). this is unlike
// (*response).wroteHeader, which tells only whether it was
// logically written.
bool header_sent;
public:
SrsHttpResponseWriter(SrsStSocket* io);
virtual ~SrsHttpResponseWriter();
public:
virtual int final_request();
virtual SrsHttpHeader* header();
virtual int write(char* data, int size);
virtual int writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
virtual void write_header(int code);
virtual int send_header(char* data, int size);
};
/**
* response reader use st socket.
*/
class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
{
private:
ISrsProtocolReaderWriter* skt;
SrsHttpMessage* owner;
SrsFastStream* buffer;
bool is_eof;
// the left bytes in chunk.
int nb_left_chunk;
// the number of bytes of current chunk.
int nb_chunk;
// already read total bytes.
int64_t nb_total_read;
public:
SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io);
virtual ~SrsHttpResponseReader();
public:
/**
* initialize the response reader with buffer.
*/
virtual int initialize(SrsFastStream* buffer);
// interface ISrsHttpResponseReader
public:
virtual bool eof();
virtual int read(char* data, int nb_data, int* nb_read);
private:
virtual int read_chunked(char* data, int nb_data, int* nb_read);
virtual int read_specified(char* data, int nb_data, int* nb_read);
};
// for http header.
typedef std::pair<std::string, std::string> SrsHttpHeaderField;
// A Request represents an HTTP request received by a server
// or to be sent by a client.
//
// The field semantics differ slightly between client and server
// usage. In addition to the notes on the fields below, see the
// documentation for Request.Write and RoundTripper.
class SrsHttpMessage : public ISrsHttpMessage
{
private:
/**
* parsed url.
*/
std::string _url;
/**
* the extension of file, for example, .flv
*/
std::string _ext;
/**
* parsed http header.
*/
http_parser _header;
/**
* body object, reader object.
* @remark, user can get body in string by get_body().
*/
SrsHttpResponseReader* _body;
/**
* whether the body is chunked.
*/
bool chunked;
/**
* whether the body is infinite chunked.
*/
bool infinite_chunked;
/**
* whether the request indicates should keep alive
* for the http connection.
*/
bool keep_alive;
/**
* uri parser
*/
SrsHttpUri* _uri;
/**
* use a buffer to read and send ts file.
*/
// TODO: FIXME: remove it.
char* _http_ts_send_buffer;
// http headers
std::vector<SrsHttpHeaderField> _headers;
// the query map
std::map<std::string, std::string> _query;
// the transport connection, can be NULL.
SrsConnection* conn;
// whether request is jsonp.
bool jsonp;
// the method in QueryString will override the HTTP method.
std::string jsonp_method;
public:
SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c);
virtual ~SrsHttpMessage();
public:
/**
* set the original messages, then update the message.
*/
virtual int update(std::string url, bool allow_jsonp, http_parser* header, SrsFastStream* body, std::vector<SrsHttpHeaderField>& headers);
public:
virtual SrsConnection* connection();
public:
virtual uint8_t method();
virtual uint16_t status_code();
/**
* method helpers.
*/
virtual std::string method_str();
virtual bool is_http_get();
virtual bool is_http_put();
virtual bool is_http_post();
virtual bool is_http_delete();
virtual bool is_http_options();
/**
* whether body is chunked encoding, for reader only.
*/
virtual bool is_chunked();
/**
* whether body is infinite chunked encoding.
* @remark set by enter_infinite_chunked.
*/
virtual bool is_infinite_chunked();
/**
* whether should keep the connection alive.
*/
virtual bool is_keep_alive();
/**
* the uri contains the host and path.
*/
virtual std::string uri();
/**
* the url maybe the path.
*/
virtual std::string url();
virtual std::string host();
virtual std::string path();
virtual std::string query();
virtual std::string ext();
/**
* get the RESTful matched id.
*/
virtual int parse_rest_id(std::string pattern);
public:
virtual int enter_infinite_chunked();
public:
/**
* read body to string.
* @remark for small http body.
*/
virtual int body_read_all(std::string& body);
/**
* get the body reader, to read one by one.
* @remark when body is very large, or chunked, use this.
*/
virtual ISrsHttpResponseReader* body_reader();
/**
* the content length, -1 for chunked or not set.
*/
virtual int64_t content_length();
/**
* get the param in query string,
* for instance, query is "start=100&end=200",
* then query_get("start") is "100", and query_get("end") is "200"
*/
virtual std::string query_get(std::string key);
/**
* get the headers.
*/
virtual int request_header_count();
virtual std::string request_header_key_at(int index);
virtual std::string request_header_value_at(int index);
virtual std::string get_request_header(std::string name);
public:
/**
* convert the http message to a request.
* @remark user must free the return request.
*/
virtual SrsRequest* to_request(std::string vhost);
public:
virtual bool is_jsonp();
};
/**
* wrapper for http-parser,
* provides HTTP message originted service.
*/
class SrsHttpParser
{
private:
http_parser_settings settings;
http_parser parser;
// the global parse buffer.
SrsFastStream* buffer;
// whether allow jsonp parse.
bool jsonp;
private:
// http parse data, reset before parse message.
bool expect_field_name;
std::string field_name;
std::string field_value;
SrsHttpParseState state;
http_parser header;
std::string url;
std::vector<SrsHttpHeaderField> headers;
int header_parsed;
public:
SrsHttpParser();
virtual ~SrsHttpParser();
public:
/**
* initialize the http parser with specified type,
* one parser can only parse request or response messages.
* @param allow_jsonp whether allow jsonp parser, which indicates the method in query string.
*/
virtual int initialize(enum http_parser_type type, bool allow_jsonp);
/**
* always parse a http message,
* that is, the *ppmsg always NOT-NULL when return success.
* or error and *ppmsg must be NULL.
* @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
* @remark user must free the ppmsg if not NULL.
*/
virtual int parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg);
private:
/**
* parse the HTTP message to member field: msg.
*/
virtual int parse_message_imp(ISrsProtocolReaderWriter* io);
private:
static int on_message_begin(http_parser* parser);
static int on_headers_complete(http_parser* parser);
static int on_message_complete(http_parser* parser);
static int on_url(http_parser* parser, const char* at, size_t length);
static int on_header_field(http_parser* parser, const char* at, size_t length);
static int on_header_value(http_parser* parser, const char* at, size_t length);
static int on_body(http_parser* parser, const char* at, size_t length);
};
/**
* The http connection which request the static or stream content.
*/

View file

@ -77,210 +77,23 @@ using namespace std;
// when edge timeout, retry next.
#define SRS_EDGE_TOKEN_TRAVERSE_TMMS (3000)
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, int64_t ctm, int64_t stm)
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, int64_t ctm, int64_t stm) : SrsBasicRtmpClient(u, ctm, stm)
{
kbps = new SrsKbps();
url = u;
connect_timeout = ctm;
stream_timeout = stm;
req = new SrsRequest();
srs_parse_rtmp_url(url, req->tcUrl, req->stream);
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param);
transport = NULL;
client = NULL;
stream_id = 0;
}
SrsSimpleRtmpClient::~SrsSimpleRtmpClient()
{
close();
srs_freep(kbps);
}
int SrsSimpleRtmpClient::connect()
{
int ret = ERROR_SUCCESS;
close();
transport = new SrsTcpClient(req->host, req->port, connect_timeout);
client = new SrsRtmpClient(transport);
kbps->set_io(transport, transport);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
close();
return ret;
}
client->set_recv_timeout(stream_timeout);
client->set_send_timeout(stream_timeout);
// connect to vhost/app
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("sdk: handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = connect_app()) != ERROR_SUCCESS) {
srs_error("sdk: connect with server failed. ret=%d", ret);
return ret;
}
if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("sdk: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}
return ret;
}
void SrsSimpleRtmpClient::close()
{
kbps->set_io(NULL, NULL);
srs_freep(client);
srs_freep(transport);
}
int SrsSimpleRtmpClient::connect_app()
{
int ret = ERROR_SUCCESS;
// args of request takes the srs info.
if (req->args == NULL) {
req->args = SrsAmf0Any::object();
}
// notify server the edge identity,
// @see https://github.com/ossrs/srs/issues/147
SrsAmf0Object* data = req->args;
data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
// for edge to directly get the id of client.
data->set("srs_pid", SrsAmf0Any::number(getpid()));
data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
// local ip of edge
std::vector<std::string> ips = srs_get_local_ipv4_ips();
assert(_srs_config->get_stats_network() < (int)ips.size());
std::string local_ip = ips[_srs_config->get_stats_network()];
data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
// generate the tcUrl
std::string param = "";
std::string target_vhost = req->vhost;
std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port, param);
// replace the tcUrl in request,
// which will replace the tc_url in client.connect_app().
req->tcUrl = tc_url;
// upnode server identity will show in the connect_app of client.
// @see https://github.com/ossrs/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);
if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode, NULL)) != ERROR_SUCCESS) {
srs_error("sdk: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
tc_url.c_str(), debug_srs_upnode, ret);
return ret;
}
return ret;
}
int SrsSimpleRtmpClient::publish()
{
int ret = ERROR_SUCCESS;
// publish.
if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("sdk: publish failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}
return ret;
}
int SrsSimpleRtmpClient::play()
{
int ret = ERROR_SUCCESS;
if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}
return ret;
}
void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age, int msgs)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, msgs, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
int SrsSimpleRtmpClient::sid()
{
return stream_id;
}
int SrsSimpleRtmpClient::recv_message(SrsCommonMessage** pmsg)
{
return client->recv_message(pmsg);
}
int SrsSimpleRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
return client->decode_message(msg, ppacket);
}
int SrsSimpleRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
return client->send_and_free_messages(msgs, nb_msgs, stream_id);
}
int SrsSimpleRtmpClient::send_and_free_message(SrsSharedPtrMessage* msg)
{
return client->send_and_free_message(msg, stream_id);
}
void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout)
{
transport->set_recv_timeout(timeout);
return do_connect_app(local_ip, debug_srs_upnode);
}
SrsClientInfo::SrsClientInfo()

View file

@ -32,6 +32,7 @@
#include <srs_app_conn.hpp>
#include <srs_app_reload.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_service_rtmp_conn.hpp>
class SrsServer;
class SrsRtmpServer;
@ -58,53 +59,15 @@ class ISrsKafkaCluster;
#endif
/**
* The simple RTMP client, provides friendly APIs.
* @remark Should never use client when closed.
* Usage:
* SrsSimpleRtmpClient client("rtmp://127.0.0.1:1935/live/livestream", 3000, 9000);
* client.connect();
* client.play();
* client.close();
* The simple rtmp client for SRS.
*/
class SrsSimpleRtmpClient
class SrsSimpleRtmpClient : public SrsBasicRtmpClient
{
private:
std::string url;
int64_t connect_timeout;
int64_t stream_timeout;
private:
SrsRequest* req;
SrsTcpClient* transport;
SrsRtmpClient* client;
SrsKbps* kbps;
int stream_id;
public:
// Constructor.
// @param u The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost
// @param ctm The timeout in ms to connect to server.
// @param stm The timeout in ms to delivery A/V stream.
SrsSimpleRtmpClient(std::string u, int64_t ctm, int64_t stm);
virtual ~SrsSimpleRtmpClient();
public:
// Connect, handshake and connect app to RTMP server.
// @remark We always close the transport.
virtual int connect();
virtual void close();
private:
protected:
virtual int connect_app();
public:
virtual int publish();
virtual int play();
virtual void kbps_sample(const char* label, int64_t age);
virtual void kbps_sample(const char* label, int64_t age, int msgs);
virtual int sid();
public:
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
virtual int send_and_free_message(SrsSharedPtrMessage* msg);
public:
virtual void set_recv_timeout(int64_t timeout);
};
/**

View file

@ -1311,8 +1311,9 @@ SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd)
return conn;
}
void SrsServer::remove(SrsConnection* conn)
void SrsServer::remove(ISrsConnection* c)
{
SrsConnection* conn = dynamic_cast<SrsConnection*>(c);
std::vector<SrsConnection*>::iterator it = std::find(conns.begin(), conns.end(), conn);
// removed by destroy, ignore.

View file

@ -365,7 +365,7 @@ public:
* when connection thread cycle terminated, callback this to delete connection.
* @see SrsConnection.on_thread_stop().
*/
virtual void remove(SrsConnection* conn);
virtual void remove(ISrsConnection* c);
// interface ISrsReloadHandler.
public:
virtual int on_reload_listen();

View file

@ -208,349 +208,3 @@ namespace internal
}
}
SrsStSocket::SrsStSocket()
{
stfd = NULL;
stm = rtm = SRS_CONSTS_NO_TMMS;
rbytes = sbytes = 0;
}
SrsStSocket::~SrsStSocket()
{
}
int SrsStSocket::initialize(st_netfd_t fd)
{
stfd = fd;
return ERROR_SUCCESS;
}
bool SrsStSocket::is_never_timeout(int64_t tm)
{
return tm == SRS_CONSTS_NO_TMMS;
}
void SrsStSocket::set_recv_timeout(int64_t tm)
{
rtm = tm;
}
int64_t SrsStSocket::get_recv_timeout()
{
return rtm;
}
void SrsStSocket::set_send_timeout(int64_t tm)
{
stm = tm;
}
int64_t SrsStSocket::get_send_timeout()
{
return stm;
}
int64_t SrsStSocket::get_recv_bytes()
{
return rbytes;
}
int64_t SrsStSocket::get_send_bytes()
{
return sbytes;
}
int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read(stfd, buf, size, rtm * 1000);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value of 0 means the network connection is closed or end of file is reached).
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
if (nb_read == 0) {
errno = ECONNRESET;
}
return ERROR_SOCKET_READ;
}
rbytes += nb_read;
return ret;
}
int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read_fully(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully(stfd, buf, size, rtm * 1000);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value less than nbyte means the network connection is closed or end of file is reached)
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read != (ssize_t)size) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
if (nb_read >= 0) {
errno = ECONNRESET;
}
return ERROR_SOCKET_READ_FULLY;
}
rbytes += nb_read;
return ret;
}
int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_write(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write(stfd, buf, size, stm * 1000);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
return ERROR_SOCKET_WRITE;
}
sbytes += nb_write;
return ret;
}
int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev(stfd, iov, iov_size, stm * 1000);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
return ERROR_SOCKET_WRITE;
}
sbytes += nb_write;
return ret;
}
SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm)
{
stfd = NULL;
io = new SrsStSocket();
host = h;
port = p;
timeout = tm;
}
SrsTcpClient::~SrsTcpClient()
{
close();
srs_freep(io);
}
int SrsTcpClient::connect()
{
int ret = ERROR_SUCCESS;
close();
srs_assert(stfd == NULL);
if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) {
srs_error("connect tcp://%s:%d failed, to=%"PRId64"ms. ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
if ((ret = io->initialize(stfd)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
void SrsTcpClient::close()
{
// Ignore when already closed.
if (!io) {
return;
}
srs_close_stfd(stfd);
}
bool SrsTcpClient::is_never_timeout(int64_t tm)
{
return io->is_never_timeout(tm);
}
void SrsTcpClient::set_recv_timeout(int64_t tm)
{
io->set_recv_timeout(tm);
}
int64_t SrsTcpClient::get_recv_timeout()
{
return io->get_recv_timeout();
}
void SrsTcpClient::set_send_timeout(int64_t tm)
{
io->set_send_timeout(tm);
}
int64_t SrsTcpClient::get_send_timeout()
{
return io->get_send_timeout();
}
int64_t SrsTcpClient::get_recv_bytes()
{
return io->get_recv_bytes();
}
int64_t SrsTcpClient::get_send_bytes()
{
return io->get_send_bytes();
}
int SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
{
return io->read(buf, size, nread);
}
int SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
{
return io->read_fully(buf, size, nread);
}
int SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
{
return io->write(buf, size, nwrite);
}
int SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
return io->writev(iov, iov_size, nwrite);
}
#ifdef __linux__
#include <sys/epoll.h>
bool srs_st_epoll_is_supported(void)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = NULL;
/* Guaranteed to fail */
epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev);
return (errno != ENOSYS);
}
#endif
int srs_st_init()
{
int ret = ERROR_SUCCESS;
#ifdef __linux__
// check epoll, some old linux donot support epoll.
// @see https://github.com/ossrs/srs/issues/162
if (!srs_st_epoll_is_supported()) {
ret = ERROR_ST_SET_EPOLL;
srs_error("epoll required on Linux. ret=%d", ret);
return ret;
}
#endif
// Select the best event system available on the OS. In Linux this is
// epoll(). On BSD it will be kqueue.
if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
ret = ERROR_ST_SET_EPOLL;
srs_error("st_set_eventsys use %s failed. ret=%d", st_get_eventsys_name(), ret);
return ret;
}
srs_info("st_set_eventsys to %s", st_get_eventsys_name());
if(st_init() != 0){
ret = ERROR_ST_INITIALIZE;
srs_error("st_init failed. ret=%d", ret);
return ret;
}
srs_trace("st_init success, use %s", st_get_eventsys_name());
return ret;
}
void srs_close_stfd(st_netfd_t& stfd)
{
if (stfd) {
// we must ensure the close is ok.
int err = st_netfd_close(stfd);
srs_assert(err != -1);
stfd = NULL;
}
}

View file

@ -159,111 +159,5 @@ namespace internal
};
}
/**
* the socket provides TCP socket over st,
* that is, the sync socket mechanism.
*/
class SrsStSocket : public ISrsProtocolReaderWriter
{
private:
// The recv/send timeout in ms.
// @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms.
int64_t rtm;
int64_t stm;
// The recv/send data in bytes
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
st_netfd_t stfd;
public:
SrsStSocket();
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
virtual int initialize(st_netfd_t fd);
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
/**
* @param nread, the actual read bytes, ignore if NULL.
*/
virtual int read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread);
/**
* @param nwrite, the actual write bytes, ignore if NULL.
*/
virtual int write(void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
/**
* The client to connect to server over TCP.
* User must never reuse the client when close it.
* Usage:
* SrsTcpClient client("127.0.0.1", 1935,9000);
* client.connect();
* client.write("Hello world!", 12, NULL);
* client.read(buf, 4096, NULL);
* @remark User can directly free the object, which will close the fd.
*/
class SrsTcpClient : public ISrsProtocolReaderWriter
{
private:
st_netfd_t stfd;
SrsStSocket* io;
private:
std::string host;
int port;
// The timeout in ms.
int64_t timeout;
public:
/**
* Constructor.
* @param h the ip or hostname of server.
* @param p the port to connect to.
* @param tm the timeout in ms.
*/
SrsTcpClient(std::string h, int p, int64_t tm);
virtual ~SrsTcpClient();
public:
/**
* Connect to server over TCP.
* @remark We will close the exists connection before do connect.
*/
virtual int connect();
private:
/**
* Close the connection to server.
* @remark User should never use the client when close it.
*/
virtual void close();
// interface ISrsProtocolReaderWriter
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread);
virtual int write(void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// initialize st, requires epoll.
extern int srs_st_init();
// close the netfd, and close the underlayer fd.
// @remark when close, user must ensure io completed.
extern void srs_close_stfd(st_netfd_t& stfd);
#endif

View file

@ -53,63 +53,6 @@ using namespace std;
// the longest time to wait for a process to quit.
#define SRS_PROCESS_QUIT_TIMEOUT_MS 1000
int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd)
{
int ret = ERROR_SUCCESS;
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_CONSTS_NO_TMMS) {
timeout = (st_utime_t)(tm * 1000);
}
*pstfd = NULL;
st_netfd_t stfd = NULL;
sockaddr_in addr;
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
// connect to server.
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
goto failed;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
goto failed;
}
srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
*pstfd = stfd;
return ret;
failed:
if (stfd) {
srs_close_stfd(stfd);
}
return ret;
}
SrsLogLevel srs_get_log_level(string level)
{
if ("verbose" == level) {
@ -1008,48 +951,6 @@ void srs_update_network_devices()
#endif
}
// we detect all network device as internet or intranet device, by its ip address.
// key is device name, for instance, eth0
// value is whether internet, for instance, true.
static std::map<std::string, bool> _srs_device_ifs;
bool srs_net_device_is_internet(string ifname)
{
srs_info("check ifname=%s", ifname.c_str());
if (_srs_device_ifs.find(ifname) == _srs_device_ifs.end()) {
return false;
}
return _srs_device_ifs[ifname];
}
bool srs_net_device_is_internet(in_addr_t addr)
{
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
return false;
}
// Class A 10.0.0.0-10.255.255.255
if (addr_h >= 0x0a000000 && addr_h <= 0x0affffff) {
return false;
}
// Class B 172.16.0.0-172.31.255.255
if (addr_h >= 0xac100000 && addr_h <= 0xac1fffff) {
return false;
}
// Class C 192.168.0.0-192.168.255.255
if (addr_h >= 0xc0a80000 && addr_h <= 0xc0a8ffff) {
return false;
}
return true;
}
SrsNetworkRtmpServer::SrsNetworkRtmpServer()
{
ok = false;
@ -1202,139 +1103,6 @@ void srs_update_rtmp_server(int nb_conn, SrsKbps* kbps)
}
}
vector<string> _srs_system_ipv4_ips;
void retrieve_local_ipv4_ips()
{
vector<string>& ips = _srs_system_ipv4_ips;
ips.clear();
ifaddrs* ifap;
if (getifaddrs(&ifap) == -1) {
srs_warn("retrieve local ips, ini ifaddrs failed.");
return;
}
stringstream ss0;
ss0 << "ips";
stringstream ss1;
ss1 << "devices";
ifaddrs* p = ifap;
while (p != NULL) {
ifaddrs* cur = p;
sockaddr* addr = cur->ifa_addr;
p = p->ifa_next;
// retrieve ipv4 addr
// ignore the tun0 network device,
// which addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
if (addr && addr->sa_family == AF_INET) {
in_addr* inaddr = &((sockaddr_in*)addr)->sin_addr;
char buf[16];
memset(buf, 0, sizeof(buf));
if ((inet_ntop(addr->sa_family, inaddr, buf, sizeof(buf))) == NULL) {
srs_warn("convert local ip failed");
break;
}
std::string ip = buf;
if (ip != SRS_CONSTS_LOCALHOST) {
ss0 << ", local[" << (int)ips.size() << "] ipv4 " << ip;
ips.push_back(ip);
}
// set the device internet status.
if (!srs_net_device_is_internet(inaddr->s_addr)) {
ss1 << ", intranet ";
_srs_device_ifs[cur->ifa_name] = false;
} else {
ss1 << ", internet ";
_srs_device_ifs[cur->ifa_name] = true;
}
ss1 << cur->ifa_name << " " << ip;
}
}
srs_trace(ss0.str().c_str());
srs_trace(ss1.str().c_str());
freeifaddrs(ifap);
}
vector<string>& srs_get_local_ipv4_ips()
{
if (_srs_system_ipv4_ips.empty()) {
retrieve_local_ipv4_ips();
}
return _srs_system_ipv4_ips;
}
std::string _public_internet_address;
string srs_get_public_internet_address()
{
if (!_public_internet_address.empty()) {
return _public_internet_address;
}
std::vector<std::string>& ips = srs_get_local_ipv4_ips();
// find the best match public address.
for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i];
in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
// Class A 10.0.0.0-10.255.255.255
if (addr_h >= 0x0a000000 && addr_h <= 0x0affffff) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
// Class B 172.16.0.0-172.31.255.255
if (addr_h >= 0xac100000 && addr_h <= 0xac1fffff) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
// Class C 192.168.0.0-192.168.255.255
if (addr_h >= 0xc0a80000 && addr_h <= 0xc0a8ffff) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
srs_warn("use public address as ip: %s", ip.c_str());
_public_internet_address = ip;
return ip;
}
// no public address, use private address.
for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i];
in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
srs_warn("use private address as ip: %s", ip.c_str());
_public_internet_address = ip;
return ip;
}
return "";
}
string srs_get_local_ip(int fd)
{
std::string ip;
@ -1407,16 +1175,6 @@ string srs_get_peer_ip(int fd)
return ip;
}
bool srs_string_is_http(string url)
{
return srs_string_starts_with(url, "http://", "https://");
}
bool srs_string_is_rtmp(string url)
{
return srs_string_starts_with(url, "rtmp://");
}
bool srs_is_digit_number(const string& str)
{
if (str.empty()) {

View file

@ -35,15 +35,12 @@
#include <srs_app_st.hpp>
#include <srs_kernel_log.hpp>
#include <srs_service_utility.hpp>
class SrsKbps;
class SrsBuffer;
class SrsJsonObject;
// client open socket and connect to server.
// @param tm The timeout in ms.
extern int srs_socket_connect(std::string server, int port, int64_t tm, st_netfd_t* pstfd);
/**
* convert level in string to log level in int.
* @return the log level defined in SrsLogLevel.
@ -602,9 +599,6 @@ extern SrsNetworkDevices* srs_get_network_devices();
extern int srs_get_network_devices_count();
// the deamon st-thread will update it.
extern void srs_update_network_devices();
// detect whether specified device is internet public address.
extern bool srs_net_device_is_internet(std::string ifname);
extern bool srs_net_device_is_internet(in_addr_t addr);
// system connections, and srs rtmp network summary
class SrsNetworkRtmpServer
@ -649,12 +643,6 @@ extern SrsNetworkRtmpServer* srs_get_network_rtmp_server();
// the deamon st-thread will update it.
extern void srs_update_rtmp_server(int nb_conn, SrsKbps* kbps);
// get local ip, fill to @param ips
extern std::vector<std::string>& srs_get_local_ipv4_ips();
// get local public ip, empty string if no public internet address found.
extern std::string srs_get_public_internet_address();
// get local or peer ip.
// where local ip is the server ip which client connected.
extern std::string srs_get_local_ip(int fd);
@ -663,10 +651,6 @@ extern int srs_get_local_port(int fd);
// where peer ip is the client public ip which connected to server.
extern std::string srs_get_peer_ip(int fd);
// whether the url is starts with http:// or https://
extern bool srs_string_is_http(std::string url);
extern bool srs_string_is_rtmp(std::string url);
// whether string is digit number
// is_digit("1234567890") === true
// is_digit("0123456789") === false

View file

@ -29,26 +29,23 @@
#include <map>
using namespace std;
#include <srs_core_autofree.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_server.hpp>
#include <srs_app_config.hpp>
#include <srs_service_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_kernel_stream.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_ts.hpp>
#include <srs_app_http_client.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_st.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_st.hpp>
#include <srs_app_utility.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_raw_avc.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_service_http_client.hpp>
#include <srs_service_log.hpp>
#include <srs_service_st.hpp>
#include <srs_service_http_conn.hpp>
#include <srs_service_rtmp_conn.hpp>
#include <srs_service_utility.hpp>
// pre-declare
int proxy_hls2rtmp(std::string hls, std::string rtmp);
@ -56,8 +53,6 @@ int proxy_hls2rtmp(std::string hls, std::string rtmp);
// @global log and context.
ISrsLog* _srs_log = new SrsConsoleLog(SrsLogLevelTrace, false);
ISrsThreadContext* _srs_context = new SrsThreadContext();
// @global config object for app module.
SrsConfig* _srs_config = NULL;
/**
* main entrance.
@ -628,7 +623,7 @@ private:
int64_t raw_aac_dts;
private:
SrsRequest* req;
SrsSimpleRtmpClient* sdk;
SrsBasicRtmpClient* sdk;
private:
SrsRawH264Stream* avc;
std::string h264_sps;
@ -1221,7 +1216,7 @@ int SrsIngestSrsOutput::connect()
// connect host.
int64_t cto = SRS_CONSTS_RTMP_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
sdk = new SrsBasicRtmpClient(url, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
close();

View file

@ -24,14 +24,11 @@
#include <srs_core.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_config.hpp>
#include <srs_app_log.hpp>
#include <srs_service_log.hpp>
// @global log and context.
ISrsLog* _srs_log = new SrsFastLog();
ISrsThreadContext* _srs_context = new ISrsThreadContext();
// @global config object for app module.
SrsConfig* _srs_config = NULL;
ISrsLog* _srs_log = new SrsConsoleLog(SrsLogLevelTrace, false);
ISrsThreadContext* _srs_context = new SrsThreadContext();
/**
* main entrance.

View file

@ -0,0 +1,41 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_conn.hpp>
ISrsConnection::ISrsConnection()
{
}
ISrsConnection::~ISrsConnection()
{
}
IConnectionManager::IConnectionManager()
{
}
IConnectionManager::~IConnectionManager()
{
}

View file

@ -0,0 +1,55 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_CONN_HPP
#define SRS_SERVICE_CONN_HPP
#include <srs_core.hpp>
/**
* The connection interface for all HTTP/RTMP/RTSP object.
*/
class ISrsConnection
{
public:
ISrsConnection();
virtual ~ISrsConnection();
};
/**
* the manager for connection.
*/
class IConnectionManager
{
public:
IConnectionManager();
virtual ~IConnectionManager();
public:
/**
* Remove then free the specified connection.
*/
virtual void remove(ISrsConnection* c) = 0;
};
#endif

View file

@ -0,0 +1,246 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_http_client.hpp>
#include <arpa/inet.h>
#include <sstream>
using namespace std;
#include <srs_protocol_kbps.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_core_autofree.hpp>
#include <srs_service_http_conn.hpp>
SrsHttpClient::SrsHttpClient()
{
transport = NULL;
kbps = new SrsKbps();
parser = NULL;
timeout = SRS_CONSTS_NO_TMMS;
port = 0;
}
SrsHttpClient::~SrsHttpClient()
{
disconnect();
srs_freep(kbps);
srs_freep(parser);
}
// TODO: FIXME: use ms for timeout.
int SrsHttpClient::initialize(string h, int p, int64_t tm)
{
int ret = ERROR_SUCCESS;
srs_freep(parser);
parser = new SrsHttpParser();
if ((ret = parser->initialize(HTTP_RESPONSE, false)) != ERROR_SUCCESS) {
srs_error("initialize parser failed. ret=%d", ret);
return ret;
}
// Always disconnect the transport.
host = h;
port = p;
timeout = tm;
disconnect();
// ep used for host in header.
string ep = host;
if (port > 0 && port != SRS_CONSTS_HTTP_DEFAULT_PORT) {
ep += ":" + srs_int2str(port);
}
// Set default value for headers.
headers["Host"] = ep;
headers["Connection"] = "Keep-Alive";
headers["User-Agent"] = RTMP_SIG_SRS_SERVER;
headers["Content-Type"] = "application/json";
return ret;
}
SrsHttpClient* SrsHttpClient::set_header(string k, string v)
{
headers[k] = v;
return this;
}
int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
{
*ppmsg = NULL;
int ret = ERROR_SUCCESS;
// always set the content length.
headers["Content-Length"] = srs_int2str(req.length());
if ((ret = connect()) != ERROR_SUCCESS) {
srs_warn("http connect server failed. ret=%d", ret);
return ret;
}
// send POST request to uri
// POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
std::stringstream ss;
ss << "POST " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF;
for (map<string, string>::iterator it = headers.begin(); it != headers.end(); ++it) {
string key = it->first;
string value = it->second;
ss << key << ": " << value << SRS_HTTP_CRLF;
}
ss << SRS_HTTP_CRLF << req;
std::string data = ss.str();
if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();
srs_error("write http post failed. ret=%d", ret);
return ret;
}
ISrsHttpMessage* msg = NULL;
if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) {
srs_error("parse http post response failed. ret=%d", ret);
return ret;
}
srs_assert(msg);
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
srs_info("parse http post response success.");
return ret;
}
int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
{
*ppmsg = NULL;
int ret = ERROR_SUCCESS;
// always set the content length.
headers["Content-Length"] = srs_int2str(req.length());
if ((ret = connect()) != ERROR_SUCCESS) {
srs_warn("http connect server failed. ret=%d", ret);
return ret;
}
// send POST request to uri
// GET %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s
std::stringstream ss;
ss << "GET " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF;
for (map<string, string>::iterator it = headers.begin(); it != headers.end(); ++it) {
string key = it->first;
string value = it->second;
ss << key << ": " << value << SRS_HTTP_CRLF;
}
ss << SRS_HTTP_CRLF << req;
std::string data = ss.str();
if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();
srs_error("write http get failed. ret=%d", ret);
return ret;
}
ISrsHttpMessage* msg = NULL;
if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) {
srs_error("parse http post response failed. ret=%d", ret);
return ret;
}
srs_assert(msg);
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
srs_info("parse http get response success.");
return ret;
}
void SrsHttpClient::set_recv_timeout(int64_t tm)
{
transport->set_recv_timeout(tm);
}
void SrsHttpClient::kbps_sample(const char* label, int64_t age)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
void SrsHttpClient::disconnect()
{
kbps->set_io(NULL, NULL);
srs_freep(transport);
}
int SrsHttpClient::connect()
{
int ret = ERROR_SUCCESS;
// When transport connected, ignore.
if (transport) {
return ret;
}
transport = new SrsTcpClient(host, port, timeout);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
disconnect();
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
// Set the recv/send timeout in ms.
transport->set_recv_timeout(timeout);
transport->set_send_timeout(timeout);
kbps->set_io(transport, transport);
return ret;
}

View file

@ -0,0 +1,111 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_HTTP_CLIENT_HPP
#define SRS_SERVICE_HTTP_CLIENT_HPP
#include <srs_core.hpp>
#include <string>
#include <map>
#include <srs_service_st.hpp>
#include <srs_http_stack.hpp>
class SrsHttpUri;
class SrsHttpParser;
class ISrsHttpMessage;
class SrsStSocket;
class SrsKbps;
class SrsTcpClient;
// the default timeout for http client.
#define SRS_HTTP_CLIENT_TMMS (30*1000)
/**
* The client to GET/POST/PUT/DELETE over HTTP.
* @remark We will reuse the TCP transport until initialize or channel error,
* such as send/recv failed.
* Usage:
* SrsHttpClient hc;
* hc.initialize("127.0.0.1", 80, 9000);
* hc.post("/api/v1/version", "Hello world!", NULL);
*/
class SrsHttpClient
{
private:
// The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected.
// We will disconnect transport when initialize or channel error, such as send/recv error.
SrsTcpClient* transport;
SrsHttpParser* parser;
std::map<std::string, std::string> headers;
SrsKbps* kbps;
private:
// The timeout in ms.
int64_t timeout;
// The host name or ip.
std::string host;
int port;
public:
SrsHttpClient();
virtual ~SrsHttpClient();
public:
/**
* Initliaze the client, disconnect the transport, renew the HTTP parser.
* @param tm The underlayer TCP transport timeout in ms.
* @remark we will set default values in headers, which can be override by set_header.
*/
virtual int initialize(std::string h, int p, int64_t tm = SRS_HTTP_CLIENT_TMMS);
/**
* Set HTTP request header in header[k]=v.
* @return the HTTP client itself.
*/
virtual SrsHttpClient* set_header(std::string k, std::string v);
public:
/**
* to post data to the uri.
* @param the path to request on.
* @param req the data post to uri. empty string to ignore.
* @param ppmsg output the http message to read the response.
* @remark user must free the ppmsg if not NULL.
*/
virtual int post(std::string path, std::string req, ISrsHttpMessage** ppmsg);
/**
* to get data from the uri.
* @param the path to request on.
* @param req the data post to uri. empty string to ignore.
* @param ppmsg output the http message to read the response.
* @remark user must free the ppmsg if not NULL.
*/
virtual int get(std::string path, std::string req, ISrsHttpMessage** ppmsg);
private:
virtual void set_recv_timeout(int64_t tm);
public:
virtual void kbps_sample(const char* label, int64_t age);
private:
virtual void disconnect();
virtual int connect();
};
#endif

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,326 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_HTTP_CONN_HPP
#define SRS_SERVICE_HTTP_CONN_HPP
#include <srs_core.hpp>
#include <string>
#include <srs_http_stack.hpp>
class ISrsProtocolReaderWriter;
class SrsConnection;
class SrsFastStream;
class SrsRequest;
class SrsHttpResponseReader;
class SrsStSocket;
/**
* wrapper for http-parser,
* provides HTTP message originted service.
*/
class SrsHttpParser
{
private:
http_parser_settings settings;
http_parser parser;
// the global parse buffer.
SrsFastStream* buffer;
// whether allow jsonp parse.
bool jsonp;
private:
// http parse data, reset before parse message.
bool expect_field_name;
std::string field_name;
std::string field_value;
SrsHttpParseState state;
http_parser header;
std::string url;
std::vector<SrsHttpHeaderField> headers;
int header_parsed;
public:
SrsHttpParser();
virtual ~SrsHttpParser();
public:
/**
* initialize the http parser with specified type,
* one parser can only parse request or response messages.
* @param allow_jsonp whether allow jsonp parser, which indicates the method in query string.
*/
virtual int initialize(enum http_parser_type type, bool allow_jsonp);
/**
* always parse a http message,
* that is, the *ppmsg always NOT-NULL when return success.
* or error and *ppmsg must be NULL.
* @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
* @remark user must free the ppmsg if not NULL.
*/
virtual int parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg);
private:
/**
* parse the HTTP message to member field: msg.
*/
virtual int parse_message_imp(ISrsProtocolReaderWriter* io);
private:
static int on_message_begin(http_parser* parser);
static int on_headers_complete(http_parser* parser);
static int on_message_complete(http_parser* parser);
static int on_url(http_parser* parser, const char* at, size_t length);
static int on_header_field(http_parser* parser, const char* at, size_t length);
static int on_header_value(http_parser* parser, const char* at, size_t length);
static int on_body(http_parser* parser, const char* at, size_t length);
};
// for http header.
typedef std::pair<std::string, std::string> SrsHttpHeaderField;
// A Request represents an HTTP request received by a server
// or to be sent by a client.
//
// The field semantics differ slightly between client and server
// usage. In addition to the notes on the fields below, see the
// documentation for Request.Write and RoundTripper.
class SrsHttpMessage : public ISrsHttpMessage
{
private:
/**
* parsed url.
*/
std::string _url;
/**
* the extension of file, for example, .flv
*/
std::string _ext;
/**
* parsed http header.
*/
http_parser _header;
/**
* body object, reader object.
* @remark, user can get body in string by get_body().
*/
SrsHttpResponseReader* _body;
/**
* whether the body is chunked.
*/
bool chunked;
/**
* whether the body is infinite chunked.
*/
bool infinite_chunked;
/**
* whether the request indicates should keep alive
* for the http connection.
*/
bool keep_alive;
/**
* uri parser
*/
SrsHttpUri* _uri;
/**
* use a buffer to read and send ts file.
*/
// TODO: FIXME: remove it.
char* _http_ts_send_buffer;
// http headers
std::vector<SrsHttpHeaderField> _headers;
// the query map
std::map<std::string, std::string> _query;
// the transport connection, can be NULL.
SrsConnection* conn;
// whether request is jsonp.
bool jsonp;
// the method in QueryString will override the HTTP method.
std::string jsonp_method;
public:
SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c);
virtual ~SrsHttpMessage();
public:
/**
* set the original messages, then update the message.
*/
virtual int update(std::string url, bool allow_jsonp, http_parser* header, SrsFastStream* body, std::vector<SrsHttpHeaderField>& headers);
public:
virtual SrsConnection* connection();
public:
virtual uint8_t method();
virtual uint16_t status_code();
/**
* method helpers.
*/
virtual std::string method_str();
virtual bool is_http_get();
virtual bool is_http_put();
virtual bool is_http_post();
virtual bool is_http_delete();
virtual bool is_http_options();
/**
* whether body is chunked encoding, for reader only.
*/
virtual bool is_chunked();
/**
* whether body is infinite chunked encoding.
* @remark set by enter_infinite_chunked.
*/
virtual bool is_infinite_chunked();
/**
* whether should keep the connection alive.
*/
virtual bool is_keep_alive();
/**
* the uri contains the host and path.
*/
virtual std::string uri();
/**
* the url maybe the path.
*/
virtual std::string url();
virtual std::string host();
virtual std::string path();
virtual std::string query();
virtual std::string ext();
/**
* get the RESTful matched id.
*/
virtual int parse_rest_id(std::string pattern);
public:
virtual int enter_infinite_chunked();
public:
/**
* read body to string.
* @remark for small http body.
*/
virtual int body_read_all(std::string& body);
/**
* get the body reader, to read one by one.
* @remark when body is very large, or chunked, use this.
*/
virtual ISrsHttpResponseReader* body_reader();
/**
* the content length, -1 for chunked or not set.
*/
virtual int64_t content_length();
/**
* get the param in query string,
* for instance, query is "start=100&end=200",
* then query_get("start") is "100", and query_get("end") is "200"
*/
virtual std::string query_get(std::string key);
/**
* get the headers.
*/
virtual int request_header_count();
virtual std::string request_header_key_at(int index);
virtual std::string request_header_value_at(int index);
virtual std::string get_request_header(std::string name);
public:
/**
* convert the http message to a request.
* @remark user must free the return request.
*/
virtual SrsRequest* to_request(std::string vhost);
public:
virtual bool is_jsonp();
};
// the http chunked header size,
// for writev, there always one chunk to send it.
#define SRS_HTTP_HEADER_CACHE_SIZE 64
/**
* response writer use st socket
*/
class SrsHttpResponseWriter : public ISrsHttpResponseWriter
{
private:
SrsStSocket* skt;
SrsHttpHeader* hdr;
private:
char header_cache[SRS_HTTP_HEADER_CACHE_SIZE];
iovec* iovss_cache;
int nb_iovss_cache;
private:
// reply header has been (logically) written
bool header_wrote;
// status code passed to WriteHeader
int status;
private:
// explicitly-declared Content-Length; or -1
int64_t content_length;
// number of bytes written in body
int64_t written;
private:
// wroteHeader tells whether the header's been written to "the
// wire" (or rather: w.conn.buf). this is unlike
// (*response).wroteHeader, which tells only whether it was
// logically written.
bool header_sent;
public:
SrsHttpResponseWriter(SrsStSocket* io);
virtual ~SrsHttpResponseWriter();
public:
virtual int final_request();
virtual SrsHttpHeader* header();
virtual int write(char* data, int size);
virtual int writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
virtual void write_header(int code);
virtual int send_header(char* data, int size);
};
/**
* response reader use st socket.
*/
class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
{
private:
ISrsProtocolReaderWriter* skt;
SrsHttpMessage* owner;
SrsFastStream* buffer;
bool is_eof;
// the left bytes in chunk.
int nb_left_chunk;
// the number of bytes of current chunk.
int nb_chunk;
// already read total bytes.
int64_t nb_total_read;
public:
SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io);
virtual ~SrsHttpResponseReader();
public:
/**
* initialize the response reader with buffer.
*/
virtual int initialize(SrsFastStream* buffer);
// interface ISrsHttpResponseReader
public:
virtual bool eof();
virtual int read(char* data, int nb_data, int* nb_read);
private:
virtual int read_chunked(char* data, int nb_data, int* nb_read);
virtual int read_specified(char* data, int nb_data, int* nb_read);
};
#endif

View file

@ -0,0 +1,241 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_rtmp_conn.hpp>
using namespace std;
#include <srs_protocol_kbps.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_service_st.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_service_utility.hpp>
SrsBasicRtmpClient::SrsBasicRtmpClient(string u, int64_t ctm, int64_t stm)
{
kbps = new SrsKbps();
url = u;
connect_timeout = ctm;
stream_timeout = stm;
req = new SrsRequest();
srs_parse_rtmp_url(url, req->tcUrl, req->stream);
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param);
transport = NULL;
client = NULL;
stream_id = 0;
}
SrsBasicRtmpClient::~SrsBasicRtmpClient()
{
close();
srs_freep(kbps);
}
int SrsBasicRtmpClient::connect()
{
int ret = ERROR_SUCCESS;
close();
transport = new SrsTcpClient(req->host, req->port, connect_timeout);
client = new SrsRtmpClient(transport);
kbps->set_io(transport, transport);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
close();
return ret;
}
client->set_recv_timeout(stream_timeout);
client->set_send_timeout(stream_timeout);
// connect to vhost/app
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("sdk: handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = connect_app()) != ERROR_SUCCESS) {
srs_error("sdk: connect with server failed. ret=%d", ret);
return ret;
}
if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("sdk: connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}
return ret;
}
void SrsBasicRtmpClient::close()
{
kbps->set_io(NULL, NULL);
srs_freep(client);
srs_freep(transport);
}
int SrsBasicRtmpClient::connect_app()
{
return do_connect_app(srs_get_public_internet_address(), false);
}
int SrsBasicRtmpClient::do_connect_app(string local_ip, bool debug)
{
int ret = ERROR_SUCCESS;
// args of request takes the srs info.
if (req->args == NULL) {
req->args = SrsAmf0Any::object();
}
// notify server the edge identity,
// @see https://github.com/ossrs/srs/issues/147
SrsAmf0Object* data = req->args;
data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));
data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));
data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));
data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));
data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
// for edge to directly get the id of client.
data->set("srs_pid", SrsAmf0Any::number(getpid()));
data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
// local ip of edge
data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));
// generate the tcUrl
std::string param = "";
std::string target_vhost = req->vhost;
std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port, param);
// replace the tcUrl in request,
// which will replace the tc_url in client.connect_app().
req->tcUrl = tc_url;
// upnode server identity will show in the connect_app of client.
// @see https://github.com/ossrs/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
if ((ret = client->connect_app(req->app, tc_url, req, debug, NULL)) != ERROR_SUCCESS) {
srs_error("sdk: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",
tc_url.c_str(), debug, ret);
return ret;
}
return ret;
}
int SrsBasicRtmpClient::publish()
{
int ret = ERROR_SUCCESS;
// publish.
if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("sdk: publish failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}
return ret;
}
int SrsBasicRtmpClient::play()
{
int ret = ERROR_SUCCESS;
if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}
return ret;
}
void SrsBasicRtmpClient::kbps_sample(const char* label, int64_t age)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
void SrsBasicRtmpClient::kbps_sample(const char* label, int64_t age, int msgs)
{
kbps->sample();
int sr = kbps->get_send_kbps();
int sr30s = kbps->get_send_kbps_30s();
int sr5m = kbps->get_send_kbps_5m();
int rr = kbps->get_recv_kbps();
int rr30s = kbps->get_recv_kbps_30s();
int rr5m = kbps->get_recv_kbps_5m();
srs_trace("<- %s time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, msgs, sr, sr30s, sr5m, rr, rr30s, rr5m);
}
int SrsBasicRtmpClient::sid()
{
return stream_id;
}
int SrsBasicRtmpClient::recv_message(SrsCommonMessage** pmsg)
{
return client->recv_message(pmsg);
}
int SrsBasicRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
return client->decode_message(msg, ppacket);
}
int SrsBasicRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
return client->send_and_free_messages(msgs, nb_msgs, stream_id);
}
int SrsBasicRtmpClient::send_and_free_message(SrsSharedPtrMessage* msg)
{
return client->send_and_free_message(msg, stream_id);
}
void SrsBasicRtmpClient::set_recv_timeout(int64_t timeout)
{
transport->set_recv_timeout(timeout);
}

View file

@ -0,0 +1,92 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_RTMP_CONN_HPP
#define SRS_SERVICE_RTMP_CONN_HPP
#include <srs_core.hpp>
#include <string>
class SrsRequest;
class SrsTcpClient;
class SrsRtmpClient;
class SrsCommonMessage;
class SrsSharedPtrMessage;
class SrsPacket;
class SrsKbps;
/**
* The simple RTMP client, provides friendly APIs.
* @remark Should never use client when closed.
* Usage:
* SrsBasicRtmpClient client("rtmp://127.0.0.1:1935/live/livestream", 3000, 9000);
* client.connect();
* client.play();
* client.close();
*/
class SrsBasicRtmpClient
{
private:
std::string url;
int64_t connect_timeout;
int64_t stream_timeout;
protected:
SrsRequest* req;
private:
SrsTcpClient* transport;
SrsRtmpClient* client;
SrsKbps* kbps;
int stream_id;
public:
// Constructor.
// @param u The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost
// @param ctm The timeout in ms to connect to server.
// @param stm The timeout in ms to delivery A/V stream.
SrsBasicRtmpClient(std::string u, int64_t ctm, int64_t stm);
virtual ~SrsBasicRtmpClient();
public:
// Connect, handshake and connect app to RTMP server.
// @remark We always close the transport.
virtual int connect();
virtual void close();
protected:
virtual int connect_app();
virtual int do_connect_app(std::string local_ip, bool debug);
public:
virtual int publish();
virtual int play();
virtual void kbps_sample(const char* label, int64_t age);
virtual void kbps_sample(const char* label, int64_t age, int msgs);
virtual int sid();
public:
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
virtual int send_and_free_message(SrsSharedPtrMessage* msg);
public:
virtual void set_recv_timeout(int64_t timeout);
};
#endif

View file

@ -23,3 +23,355 @@
#include <srs_service_st.hpp>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_service_utility.hpp>
#ifdef __linux__
#include <sys/epoll.h>
bool srs_st_epoll_is_supported(void)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = NULL;
/* Guaranteed to fail */
epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev);
return (errno != ENOSYS);
}
#endif
int srs_st_init()
{
int ret = ERROR_SUCCESS;
#ifdef __linux__
// check epoll, some old linux donot support epoll.
// @see https://github.com/ossrs/srs/issues/162
if (!srs_st_epoll_is_supported()) {
ret = ERROR_ST_SET_EPOLL;
srs_error("epoll required on Linux. ret=%d", ret);
return ret;
}
#endif
// Select the best event system available on the OS. In Linux this is
// epoll(). On BSD it will be kqueue.
if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
ret = ERROR_ST_SET_EPOLL;
srs_error("st_set_eventsys use %s failed. ret=%d", st_get_eventsys_name(), ret);
return ret;
}
srs_info("st_set_eventsys to %s", st_get_eventsys_name());
if(st_init() != 0){
ret = ERROR_ST_INITIALIZE;
srs_error("st_init failed. ret=%d", ret);
return ret;
}
srs_trace("st_init success, use %s", st_get_eventsys_name());
return ret;
}
void srs_close_stfd(st_netfd_t& stfd)
{
if (stfd) {
// we must ensure the close is ok.
int err = st_netfd_close(stfd);
srs_assert(err != -1);
stfd = NULL;
}
}
SrsStSocket::SrsStSocket()
{
stfd = NULL;
stm = rtm = SRS_CONSTS_NO_TMMS;
rbytes = sbytes = 0;
}
SrsStSocket::~SrsStSocket()
{
}
int SrsStSocket::initialize(st_netfd_t fd)
{
stfd = fd;
return ERROR_SUCCESS;
}
bool SrsStSocket::is_never_timeout(int64_t tm)
{
return tm == SRS_CONSTS_NO_TMMS;
}
void SrsStSocket::set_recv_timeout(int64_t tm)
{
rtm = tm;
}
int64_t SrsStSocket::get_recv_timeout()
{
return rtm;
}
void SrsStSocket::set_send_timeout(int64_t tm)
{
stm = tm;
}
int64_t SrsStSocket::get_send_timeout()
{
return stm;
}
int64_t SrsStSocket::get_recv_bytes()
{
return rbytes;
}
int64_t SrsStSocket::get_send_bytes()
{
return sbytes;
}
int SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read(stfd, buf, size, rtm * 1000);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value of 0 means the network connection is closed or end of file is reached).
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
if (nb_read == 0) {
errno = ECONNRESET;
}
return ERROR_SOCKET_READ;
}
rbytes += nb_read;
return ret;
}
int SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read_fully(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully(stfd, buf, size, rtm * 1000);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value less than nbyte means the network connection is closed or end of file is reached)
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read != (ssize_t)size) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
if (nb_read >= 0) {
errno = ECONNRESET;
}
return ERROR_SOCKET_READ_FULLY;
}
rbytes += nb_read;
return ret;
}
int SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_write(stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write(stfd, buf, size, stm * 1000);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
return ERROR_SOCKET_WRITE;
}
sbytes += nb_write;
return ret;
}
int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev(stfd, iov, iov_size, stm * 1000);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
return ERROR_SOCKET_WRITE;
}
sbytes += nb_write;
return ret;
}
SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm)
{
stfd = NULL;
io = new SrsStSocket();
host = h;
port = p;
timeout = tm;
}
SrsTcpClient::~SrsTcpClient()
{
close();
srs_freep(io);
}
int SrsTcpClient::connect()
{
int ret = ERROR_SUCCESS;
close();
srs_assert(stfd == NULL);
if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) {
srs_error("connect tcp://%s:%d failed, to=%"PRId64"ms. ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
if ((ret = io->initialize(stfd)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
void SrsTcpClient::close()
{
// Ignore when already closed.
if (!io) {
return;
}
srs_close_stfd(stfd);
}
bool SrsTcpClient::is_never_timeout(int64_t tm)
{
return io->is_never_timeout(tm);
}
void SrsTcpClient::set_recv_timeout(int64_t tm)
{
io->set_recv_timeout(tm);
}
int64_t SrsTcpClient::get_recv_timeout()
{
return io->get_recv_timeout();
}
void SrsTcpClient::set_send_timeout(int64_t tm)
{
io->set_send_timeout(tm);
}
int64_t SrsTcpClient::get_send_timeout()
{
return io->get_send_timeout();
}
int64_t SrsTcpClient::get_recv_bytes()
{
return io->get_recv_bytes();
}
int64_t SrsTcpClient::get_send_bytes()
{
return io->get_send_bytes();
}
int SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
{
return io->read(buf, size, nread);
}
int SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
{
return io->read_fully(buf, size, nread);
}
int SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
{
return io->write(buf, size, nwrite);
}
int SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
return io->writev(iov, iov_size, nwrite);
}

View file

@ -26,7 +26,116 @@
#include <srs_core.hpp>
#include <string>
#include <st.h>
#include <srs_protocol_io.hpp>
// initialize st, requires epoll.
extern int srs_st_init();
// close the netfd, and close the underlayer fd.
// @remark when close, user must ensure io completed.
extern void srs_close_stfd(st_netfd_t& stfd);
/**
* the socket provides TCP socket over st,
* that is, the sync socket mechanism.
*/
class SrsStSocket : public ISrsProtocolReaderWriter
{
private:
// The recv/send timeout in ms.
// @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms.
int64_t rtm;
int64_t stm;
// The recv/send data in bytes
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
st_netfd_t stfd;
public:
SrsStSocket();
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
virtual int initialize(st_netfd_t fd);
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
/**
* @param nread, the actual read bytes, ignore if NULL.
*/
virtual int read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread);
/**
* @param nwrite, the actual write bytes, ignore if NULL.
*/
virtual int write(void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
/**
* The client to connect to server over TCP.
* User must never reuse the client when close it.
* Usage:
* SrsTcpClient client("127.0.0.1", 1935,9000);
* client.connect();
* client.write("Hello world!", 12, NULL);
* client.read(buf, 4096, NULL);
* @remark User can directly free the object, which will close the fd.
*/
class SrsTcpClient : public ISrsProtocolReaderWriter
{
private:
st_netfd_t stfd;
SrsStSocket* io;
private:
std::string host;
int port;
// The timeout in ms.
int64_t timeout;
public:
/**
* Constructor.
* @param h the ip or hostname of server.
* @param p the port to connect to.
* @param tm the timeout in ms.
*/
SrsTcpClient(std::string h, int p, int64_t tm);
virtual ~SrsTcpClient();
public:
/**
* Connect to server over TCP.
* @remark We will close the exists connection before do connect.
*/
virtual int connect();
private:
/**
* Close the connection to server.
* @remark User should never use the client when close it.
*/
virtual void close();
// interface ISrsProtocolReaderWriter
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual int read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread);
virtual int write(void* buf, size_t size, ssize_t* nwrite);
virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
#endif

View file

@ -0,0 +1,280 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_service_utility.hpp>
#include <unistd.h>
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <map>
#include <sstream>
using namespace std;
#include <srs_service_st.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd)
{
int ret = ERROR_SUCCESS;
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_CONSTS_NO_TMMS) {
timeout = (st_utime_t)(tm * 1000);
}
*pstfd = NULL;
st_netfd_t stfd = NULL;
sockaddr_in addr;
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
srs_assert(!stfd);
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
// connect to server.
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
goto failed;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
goto failed;
}
srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
*pstfd = stfd;
return ret;
failed:
if (stfd) {
srs_close_stfd(stfd);
}
return ret;
}
bool srs_string_is_http(string url)
{
return srs_string_starts_with(url, "http://", "https://");
}
bool srs_string_is_rtmp(string url)
{
return srs_string_starts_with(url, "rtmp://");
}
// we detect all network device as internet or intranet device, by its ip address.
// key is device name, for instance, eth0
// value is whether internet, for instance, true.
static std::map<std::string, bool> _srs_device_ifs;
bool srs_net_device_is_internet(string ifname)
{
srs_info("check ifname=%s", ifname.c_str());
if (_srs_device_ifs.find(ifname) == _srs_device_ifs.end()) {
return false;
}
return _srs_device_ifs[ifname];
}
bool srs_net_device_is_internet(in_addr_t addr)
{
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
return false;
}
// Class A 10.0.0.0-10.255.255.255
if (addr_h >= 0x0a000000 && addr_h <= 0x0affffff) {
return false;
}
// Class B 172.16.0.0-172.31.255.255
if (addr_h >= 0xac100000 && addr_h <= 0xac1fffff) {
return false;
}
// Class C 192.168.0.0-192.168.255.255
if (addr_h >= 0xc0a80000 && addr_h <= 0xc0a8ffff) {
return false;
}
return true;
}
vector<string> _srs_system_ipv4_ips;
void retrieve_local_ipv4_ips()
{
vector<string>& ips = _srs_system_ipv4_ips;
ips.clear();
ifaddrs* ifap;
if (getifaddrs(&ifap) == -1) {
srs_warn("retrieve local ips, ini ifaddrs failed.");
return;
}
stringstream ss0;
ss0 << "ips";
stringstream ss1;
ss1 << "devices";
ifaddrs* p = ifap;
while (p != NULL) {
ifaddrs* cur = p;
sockaddr* addr = cur->ifa_addr;
p = p->ifa_next;
// retrieve ipv4 addr
// ignore the tun0 network device,
// which addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141
if (addr && addr->sa_family == AF_INET) {
in_addr* inaddr = &((sockaddr_in*)addr)->sin_addr;
char buf[16];
memset(buf, 0, sizeof(buf));
if ((inet_ntop(addr->sa_family, inaddr, buf, sizeof(buf))) == NULL) {
srs_warn("convert local ip failed");
break;
}
std::string ip = buf;
if (ip != SRS_CONSTS_LOCALHOST) {
ss0 << ", local[" << (int)ips.size() << "] ipv4 " << ip;
ips.push_back(ip);
}
// set the device internet status.
if (!srs_net_device_is_internet(inaddr->s_addr)) {
ss1 << ", intranet ";
_srs_device_ifs[cur->ifa_name] = false;
} else {
ss1 << ", internet ";
_srs_device_ifs[cur->ifa_name] = true;
}
ss1 << cur->ifa_name << " " << ip;
}
}
srs_trace(ss0.str().c_str());
srs_trace(ss1.str().c_str());
freeifaddrs(ifap);
}
vector<string>& srs_get_local_ipv4_ips()
{
if (_srs_system_ipv4_ips.empty()) {
retrieve_local_ipv4_ips();
}
return _srs_system_ipv4_ips;
}
std::string _public_internet_address;
string srs_get_public_internet_address()
{
if (!_public_internet_address.empty()) {
return _public_internet_address;
}
std::vector<std::string>& ips = srs_get_local_ipv4_ips();
// find the best match public address.
for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i];
in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
// Class A 10.0.0.0-10.255.255.255
if (addr_h >= 0x0a000000 && addr_h <= 0x0affffff) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
// Class B 172.16.0.0-172.31.255.255
if (addr_h >= 0xac100000 && addr_h <= 0xac1fffff) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
// Class C 192.168.0.0-192.168.255.255
if (addr_h >= 0xc0a80000 && addr_h <= 0xc0a8ffff) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
srs_warn("use public address as ip: %s", ip.c_str());
_public_internet_address = ip;
return ip;
}
// no public address, use private address.
for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i];
in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1
if (addr_h >= 0x7f000000 && addr_h <= 0x7f000001) {
srs_trace("ignore private address: %s", ip.c_str());
continue;
}
srs_warn("use private address as ip: %s", ip.c_str());
_public_internet_address = ip;
return ip;
}
return "";
}

View file

@ -0,0 +1,53 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2017 OSSRS(winlin)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_SERVICE_UTILITY_HPP
#define SRS_SERVICE_UTILITY_HPP
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <srs_service_st.hpp>
// client open socket and connect to server.
// @param tm The timeout in ms.
extern int srs_socket_connect(std::string server, int port, int64_t tm, st_netfd_t* pstfd);
// whether the url is starts with http:// or https://
extern bool srs_string_is_http(std::string url);
extern bool srs_string_is_rtmp(std::string url);
// get local ip, fill to @param ips
extern std::vector<std::string>& srs_get_local_ipv4_ips();
// get local public ip, empty string if no public internet address found.
extern std::string srs_get_public_internet_address();
// detect whether specified device is internet public address.
extern bool srs_net_device_is_internet(std::string ifname);
extern bool srs_net_device_is_internet(in_addr_t addr);
#endif