1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

support dead-loop detect for forwarder and transcoder.

This commit is contained in:
winlin 2013-12-01 17:32:06 +08:00
parent e4ea965a3a
commit 6af0794bab
17 changed files with 246 additions and 142 deletions

View file

@ -24,6 +24,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <sys/time.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <srs_core_log.hpp>
static int64_t _srs_system_time_us_cache = 0;
@ -60,3 +64,49 @@ std::string srs_replace(std::string str, std::string old_str, std::string new_st
return ret;
}
std::string srs_dns_resolve(std::string host)
{
if (inet_addr(host.c_str()) != INADDR_NONE) {
return host;
}
hostent* answer = gethostbyname(host.c_str());
if (answer == NULL) {
srs_error("dns resolve host %s error.", host.c_str());
return "";
}
char ipv4[16];
memset(ipv4, 0, sizeof(ipv4));
for (int i = 0; i < answer->h_length; i++) {
inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
break;
}
return ipv4;
}
void srs_vhost_resolve(std::string& vhost, std::string& app)
{
app = srs_replace(app, "...", "?");
if ((pos = app.find("?")) == std::string::npos) {
return;
}
std::string query = app.substr(pos + 1);
app = app.substr(0, pos);
if ((pos = query.find("vhost?")) != std::string::npos
|| (pos = query.find("vhost=")) != std::string::npos
|| (pos = query.find("Vhost?")) != std::string::npos
|| (pos = query.find("Vhost=")) != std::string::npos
) {
query = query.substr(pos + 6);
if (!query.empty()) {
vhost = query;
}
}
}

View file

@ -94,5 +94,12 @@ extern void srs_update_system_time_ms();
#include <string>
// replace utility
extern std::string srs_replace(std::string str, std::string old_str, std::string new_str);
// dns resolve utility, return the resolved ip address.
extern std::string srs_dns_resolve(std::string host);
// resolve the vhost in query string
// @param app, may contains the vhost in query string format:
// app?vhost=request_vhost
// app...vhost...request_vhost
extern void srs_vhost_resolve(std::string& vhost, std::string& app);
#endif

View file

@ -343,7 +343,7 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
// notify the hls to prepare when publish start.
if ((ret = source->on_publish(req->vhost, req->port, req->app, req->stream)) != ERROR_SUCCESS) {
if ((ret = source->on_publish(req)) != ERROR_SUCCESS) {
srs_error("hls on_publish failed. ret=%d", ret);
return ret;
}

View file

@ -37,6 +37,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// default vhost for rtmp
#define RTMP_VHOST_DEFAULT "__defaultVhost__"
#define SRS_LOCALHOST "127.0.0.1"
#define RTMP_DEFAULT_PORT 1935
#define RTMP_DEFAULT_PORTS "1935"
#define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html"
#define SRS_CONF_DEFAULT_HLS_FRAGMENT 10
#define SRS_CONF_DEFAULT_HLS_WINDOW 60

View file

@ -26,15 +26,23 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <stdlib.h>
#include <unistd.h>
#include <algorithm>
#include <srs_core_error.hpp>
#include <srs_core_log.hpp>
#include <srs_core_config.hpp>
#include <srs_core_rtmp.hpp>
#ifdef SRS_FFMPEG
#define SRS_ENCODER_SLEEP_MS 2000
#define SRS_ENCODER_VCODEC "libx264"
#define SRS_ENCODER_ACODEC "libaacplus"
// for encoder to detect the dead loop
static std::vector<std::string> _transcoded_url;
SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
{
started = false;
@ -56,7 +64,7 @@ SrsFFMPEG::~SrsFFMPEG()
stop();
}
int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)
int SrsFFMPEG::initialize(SrsRequest* req, SrsConfDirective* engine)
{
int ret = ERROR_SUCCESS;
@ -84,39 +92,32 @@ int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app,
// input stream, from local.
// ie. rtmp://127.0.0.1:1935/live/livestream
input = "rtmp://127.0.0.1:";
input += port;
input += req->port;
input += "/";
input += app;
input += req->app;
input += "?vhost=";
input += req->vhost;
input += "/";
input += stream;
input += req->stream;
// output stream, to other/self server
// ie. rtmp://127.0.0.1:1935/live/livestream_sd
if (vhost == RTMP_VHOST_DEFAULT) {
output = srs_replace(output, "[vhost]", "127.0.0.1");
} else {
output = srs_replace(output, "[vhost]", vhost);
}
output = srs_replace(output, "[port]", port);
output = srs_replace(output, "[app]", app);
output = srs_replace(output, "[stream]", stream);
output = srs_replace(output, "[vhost]", req->vhost);
output = srs_replace(output, "[port]", req->port);
output = srs_replace(output, "[app]", req->app);
output = srs_replace(output, "[stream]", req->stream);
output = srs_replace(output, "[engine]", engine->arg0());
// important: loop check, donot transcode again.
// we think the following is loop circle:
// input: rtmp://127.0.0.1:1935/live/livestream_sd
// output: rtmp://127.0.0.1:1935/live/livestream_sd_sd
std::string tail = ""; // tail="_sd"
if (output.length() > input.length()) {
tail = output.substr(input.length());
}
// TODO: better dead loop check.
// if input also endwiths the tail, loop detected.
if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {
std::vector<std::string>::iterator it;
it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input);
if (it != _transcoded_url.end()) {
ret = ERROR_ENCODER_LOOP;
srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
input.c_str(), output.c_str(), ret);
return ret;
}
_transcoded_url.push_back(output);
if (vcodec != SRS_ENCODER_VCODEC) {
ret = ERROR_ENCODER_VCODEC;
@ -303,6 +304,20 @@ int SrsFFMPEG::start()
params.push_back("-y");
params.push_back(output);
if (true) {
int pparam_size = 8 * 1024;
char* pparam = new char[pparam_size];
char* p = pparam;
char* last = pparam + pparam_size;
for (int i = 0; i < (int)params.size(); i++) {
std::string ffp = params[i];
snprintf(p, last - p, "%s ", ffp.c_str());
p += ffp.length() + 1;
}
srs_trace("start transcoder: %s", pparam);
srs_freepa(pparam);
}
// TODO: fork or vfork?
if ((pid = fork()) < 0) {
@ -346,6 +361,12 @@ void SrsFFMPEG::stop()
if (!started) {
return;
}
std::vector<std::string>::iterator it;
it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output);
if (it != _transcoded_url.end()) {
_transcoded_url.erase(it);
}
}
SrsEncoder::SrsEncoder()
@ -359,7 +380,7 @@ SrsEncoder::~SrsEncoder()
on_unpublish();
}
int SrsEncoder::parse_scope_engines()
int SrsEncoder::parse_scope_engines(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
@ -368,17 +389,17 @@ int SrsEncoder::parse_scope_engines()
// parse vhost scope engines
std::string scope = "";
if ((conf = config->get_transcode(vhost, "")) != NULL) {
if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
srs_error("parse vhost scope=%s transcode engines failed. "
"ret=%d", scope.c_str(), ret);
return ret;
}
}
// parse app scope engines
scope = app;
if ((conf = config->get_transcode(vhost, app)) != NULL) {
if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
scope = req->app;
if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
srs_error("parse app scope=%s transcode engines failed. "
"ret=%d", scope.c_str(), ret);
return ret;
@ -386,9 +407,9 @@ int SrsEncoder::parse_scope_engines()
}
// parse stream scope engines
scope += "/";
scope += stream;
if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {
if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
scope += req->stream;
if ((conf = config->get_transcode(req->vhost, scope)) != NULL) {
if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) {
srs_error("parse stream scope=%s transcode engines failed. "
"ret=%d", scope.c_str(), ret);
return ret;
@ -398,16 +419,11 @@ int SrsEncoder::parse_scope_engines()
return ret;
}
int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)
int SrsEncoder::on_publish(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
vhost = _vhost;
port = _port;
app = _app;
stream = _stream;
ret = parse_scope_engines();
ret = parse_scope_engines(req);
// ignore the loop encoder
if (ret == ERROR_ENCODER_LOOP) {
@ -457,7 +473,7 @@ SrsFFMPEG* SrsEncoder::at(int index)
return ffmpegs[index];
}
int SrsEncoder::parse_transcode(SrsConfDirective* conf)
int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf)
{
int ret = ERROR_SUCCESS;
@ -498,7 +514,7 @@ int SrsEncoder::parse_transcode(SrsConfDirective* conf)
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {
if ((ret = ffmpeg->initialize(req, engine)) != ERROR_SUCCESS) {
srs_freep(ffmpeg);
// if got a loop, donot transcode the whole stream.
@ -577,3 +593,5 @@ void* SrsEncoder::encoder_thread(void* arg)
return NULL;
}
#endif

View file

@ -35,6 +35,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <st.h>
class SrsConfDirective;
class SrsRequest;
#ifdef SRS_FFMPEG
/**
* a transcode engine: ffmepg,
@ -68,7 +71,7 @@ public:
SrsFFMPEG(std::string ffmpeg_bin);
virtual ~SrsFFMPEG();
public:
virtual int initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine);
virtual int initialize(SrsRequest* req, SrsConfDirective* engine);
virtual int start();
virtual void stop();
};
@ -79,11 +82,6 @@ public:
*/
class SrsEncoder
{
private:
std::string vhost;
std::string port;
std::string app;
std::string stream;
private:
std::vector<SrsFFMPEG*> ffmpegs;
private:
@ -93,16 +91,18 @@ public:
SrsEncoder();
virtual ~SrsEncoder();
public:
virtual int on_publish(std::string vhost, std::string port, std::string app, std::string stream);
virtual int on_publish(SrsRequest* req);
virtual void on_unpublish();
private:
virtual int parse_scope_engines();
virtual int parse_scope_engines(SrsRequest* req);
virtual void clear_engines();
virtual SrsFFMPEG* at(int index);
virtual int parse_transcode(SrsConfDirective* conf);
virtual int parse_transcode(SrsRequest* req, SrsConfDirective* conf);
virtual int cycle();
virtual void encoder_cycle();
static void* encoder_thread(void* arg);
};
#endif
#endif

View file

@ -85,6 +85,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SYSTEM_STREAM_BUSY 410
#define ERROR_SYSTEM_IP_INVALID 411
#define ERROR_SYSTEM_CONFIG_TOO_LARGE 412
#define ERROR_SYSTEM_FORWARD_LOOP 413
// see librtmp.
// failed when open ssl create the dh

View file

@ -27,13 +27,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <srs_core_error.hpp>
#include <srs_core_rtmp.hpp>
#include <srs_core_log.hpp>
#include <srs_core_protocol.hpp>
#include <srs_core_pithy_print.hpp>
#include <srs_core_rtmp.hpp>
#include <srs_core_config.hpp>
#define SRS_PULSE_TIMEOUT_MS 100
#define SRS_FORWARDER_SLEEP_MS 2000
@ -62,29 +63,54 @@ SrsForwarder::~SrsForwarder()
msgs.clear();
}
int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
{
int ret = ERROR_SUCCESS;
app = _app;
// forward app
app = req->app;
tc_url = "rtmp://";
tc_url += vhost;
tc_url += "/";
tc_url += app;
stream_name = stream;
stream_name = req->stream;
server = forward_server;
port = 1935;
// TODO: dead loop check.
std::string s_port = RTMP_DEFAULT_PORTS;
port = RTMP_DEFAULT_PORT;
size_t pos = forward_server.find(":");
if (pos != std::string::npos) {
port = ::atoi(forward_server.substr(pos + 1).c_str());
s_port = forward_server.substr(pos + 1);
server = forward_server.substr(0, pos);
}
// discovery vhost
std::string vhost = req->vhost;
srs_vhost_resolve(vhost, s_port);
port = ::atoi(s_port.c_str());
// generate tcUrl
tc_url = "rtmp://";
tc_url += vhost;
tc_url += "/";
tc_url += req->app;
// dead loop check
std::string source_ep = req->vhost;
source_ep += ":";
source_ep += req->port;
std::string dest_ep = vhost;
dest_ep += ":";
dest_ep += s_port;
if (source_ep == dest_ep) {
ret = ERROR_SYSTEM_FORWARD_LOOP;
srs_warn("farder loop detected. src=%s, dest=%s, ret=%d",
source_ep.c_str(), dest_ep.c_str(), ret);
return ret;
}
srs_trace("start forward %s to %s, stream: %s/%s",
source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(),
stream_name.c_str());
// start forward
if ((ret = open_socket()) != ERROR_SUCCESS) {
return ret;
}
@ -179,7 +205,7 @@ int SrsForwarder::connect_server()
{
int ret = ERROR_SUCCESS;
std::string ip = parse_server(server);
std::string ip = srs_dns_resolve(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
@ -201,29 +227,6 @@ int SrsForwarder::connect_server()
return ret;
}
std::string SrsForwarder::parse_server(std::string host)
{
if (inet_addr(host.c_str()) != INADDR_NONE) {
return host;
}
hostent* answer = gethostbyname(host.c_str());
if (answer == NULL) {
srs_error("dns resolve host %s error.", host.c_str());
return "";
}
char ipv4[16];
memset(ipv4, 0, sizeof(ipv4));
for (int i = 0; i < answer->h_length; i++) {
inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
break;
}
return ipv4;
}
int SrsForwarder::cycle()
{
int ret = ERROR_SUCCESS;

View file

@ -37,6 +37,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
class SrsRtmpClient;
class SrsRequest;
/**
* forward the stream to other servers.
@ -61,7 +62,7 @@ public:
SrsForwarder();
virtual ~SrsForwarder();
public:
virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server);
virtual int on_publish(SrsRequest* req, std::string forward_server);
virtual void on_unpublish();
virtual int on_meta_data(SrsSharedPtrMessage* metadata);
virtual int on_audio(SrsSharedPtrMessage* msg);
@ -69,7 +70,6 @@ public:
private:
virtual int open_socket();
virtual int connect_server();
std::string parse_server(std::string host);
private:
virtual int cycle();
virtual int forward();

View file

@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_config.hpp>
#include <srs_core_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_core_rtmp.hpp>
// @see: NGX_RTMP_HLS_DELAY,
// 63000: 700ms, ts_tbn=90000
@ -466,13 +467,13 @@ SrsHls::~SrsHls()
srs_freep(video_frame);
}
int SrsHls::on_publish(std::string _vhost, std::string _app, std::string _stream)
int SrsHls::on_publish(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
vhost = _vhost;
stream = _stream;
app = _app;
vhost = req->vhost;
stream = req->stream;
app = req->app;
// TODO: subscribe the reload event.

View file

@ -42,6 +42,7 @@ class SrsMpegtsFrame;
class SrsRtmpJitter;
class SrsTSMuxer;
class SrsCodec;
class SrsRequest;
/**
* 3.3.2. EXTINF
@ -142,7 +143,7 @@ public:
SrsHls();
virtual ~SrsHls();
public:
virtual int on_publish(std::string _vhost, std::string _app, std::string _stream);
virtual int on_publish(SrsRequest* req);
virtual void on_unpublish();
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsSharedPtrMessage* audio);

View file

@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_autofree.hpp>
#include <srs_core_amf0.hpp>
#include <srs_core_handshake.hpp>
#include <srs_core_config.hpp>
/**
* the signature for packets to client.
@ -93,7 +94,7 @@ int SrsRequest::discovery_app()
srs_verbose("discovery vhost=%s", vhost.c_str());
}
port = "1935";
port = RTMP_DEFAULT_PORTS;
if ((pos = vhost.find(":")) != std::string::npos) {
port = vhost.substr(pos + 1);
vhost = vhost.substr(0, pos);
@ -101,6 +102,14 @@ int SrsRequest::discovery_app()
}
app = url;
srs_vhost_resolve(vhost, app);
// resolve the vhost from config
SrsConfDirective* parsed_vhost = config->get_vhost(vhost);
if (parsed_vhost) {
vhost = parsed_vhost->arg0();
}
srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
schema.c_str(), vhost.c_str(), port.c_str(), app.c_str());

View file

@ -48,6 +48,12 @@ class SrsOnMetaDataPacket;
*/
struct SrsRequest
{
/**
* tcUrl: rtmp://request_vhost:port/app/stream
* support pass vhost in query string, such as:
* rtmp://ip:port/app?vhost=request_vhost/stream
* rtmp://ip:port/app...vhost...request_vhost/stream
*/
std::string tcUrl;
std::string pageUrl;
std::string swfUrl;

View file

@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_forward.hpp>
#include <srs_core_config.hpp>
#include <srs_core_encoder.hpp>
#include <srs_core_rtmp.hpp>
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 10
@ -612,56 +613,48 @@ int SrsSource::on_video(SrsCommonMessage* video)
return ret;
}
int SrsSource::on_publish(std::string vhost, std::string port, std::string app, std::string stream)
int SrsSource::on_publish(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
_can_publish = false;
#ifdef SRS_HLS
if ((ret = hls->on_publish(vhost, app, stream)) != ERROR_SUCCESS) {
return ret;
}
#endif
#ifdef SRS_FFMPEG
if ((ret = encoder->on_publish(vhost, port, app, stream)) != ERROR_SUCCESS) {
return ret;
}
#endif
// TODO: support reload.
// create forwarders
SrsConfDirective* conf = config->get_forward(vhost);
SrsConfDirective* conf = config->get_forward(req->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);
SrsForwarder* forwarder = new SrsForwarder();
forwarders.push_back(forwarder);
if ((ret = forwarder->on_publish(vhost, app, stream, forward_server)) != ERROR_SUCCESS) {
if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {
srs_error("start forwarder failed. "
"vhost=%s, app=%s, stream=%s, forward-to=%s",
vhost.c_str(), app.c_str(), stream.c_str(),
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),
forward_server.c_str());
return ret;
}
}
#ifdef SRS_FFMPEG
if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
return ret;
}
#endif
#ifdef SRS_HLS
if ((ret = hls->on_publish(req)) != ERROR_SUCCESS) {
return ret;
}
#endif
return ret;
}
void SrsSource::on_unpublish()
{
#ifdef SRS_HLS
hls->on_unpublish();
#endif
#ifdef SRS_FFMPEG
encoder->on_unpublish();
#endif
// close all forwarders
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
@ -671,6 +664,14 @@ void SrsSource::on_unpublish()
}
forwarders.clear();
#ifdef SRS_FFMPEG
encoder->on_unpublish();
#endif
#ifdef SRS_HLS
hls->on_unpublish();
#endif
gop_cache->clear();
srs_freep(cache_metadata);

View file

@ -39,6 +39,7 @@ class SrsCommonMessage;
class SrsOnMetaDataPacket;
class SrsSharedPtrMessage;
class SrsForwarder;
class SrsRequest;
#ifdef SRS_HLS
class SrsHls;
#endif
@ -210,7 +211,7 @@ public:
virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsCommonMessage* audio);
virtual int on_video(SrsCommonMessage* video);
virtual int on_publish(std::string vhost, std::string port, std::string app, std::string stream);
virtual int on_publish(SrsRequest* req);
virtual void on_unpublish();
public:
virtual int create_consumer(SrsConsumer*& consumer);