1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

implements ingest, change to 0.9.53

This commit is contained in:
winlin 2014-04-07 14:20:03 +08:00
parent 2742679354
commit e89ab84e4e
13 changed files with 357 additions and 147 deletions

View file

@ -104,7 +104,8 @@ vhost ingest.srs.com {
# can be file/stream/device, that is, # can be file/stream/device, that is,
# file: ingest file specifies by url. # file: ingest file specifies by url.
# stream: ingest stream specifeis by url. # stream: ingest stream specifeis by url.
# devide: not support yet. # device: not support yet.
# default: file
type file; type file;
# the url of file/stream. # the url of file/stream.
url ./doc/source.200kbps.768x320.flv; url ./doc/source.200kbps.768x320.flv;
@ -810,6 +811,9 @@ pithy_print {
# shared print interval for all encoders, in milliseconds. # shared print interval for all encoders, in milliseconds.
# if not specified, set to 2000. # if not specified, set to 2000.
encoder 3000; encoder 3000;
# shared print interval for all ingesters, in milliseconds.
# if not specified, set to 2000.
ingester 3000;
# shared print interval for all hls, in milliseconds. # shared print interval for all hls, in milliseconds.
# if not specified, set to 2000. # if not specified, set to 2000.
hls 3000; hls 3000;

View file

@ -847,12 +847,12 @@ int SrsConfig::get_pithy_print_forwarder()
int SrsConfig::get_pithy_print_encoder() int SrsConfig::get_pithy_print_encoder()
{ {
SrsConfDirective* pithy = root->get("encoder"); SrsConfDirective* pithy = root->get("pithy_print");
if (!pithy) { if (!pithy) {
return SRS_STAGE_ENCODER_INTERVAL_MS; return SRS_STAGE_ENCODER_INTERVAL_MS;
} }
pithy = pithy->get("forwarder"); pithy = pithy->get("encoder");
if (!pithy) { if (!pithy) {
return SRS_STAGE_ENCODER_INTERVAL_MS; return SRS_STAGE_ENCODER_INTERVAL_MS;
} }
@ -860,6 +860,21 @@ int SrsConfig::get_pithy_print_encoder()
return ::atoi(pithy->arg0().c_str()); return ::atoi(pithy->arg0().c_str());
} }
int SrsConfig::get_pithy_print_ingester()
{
SrsConfDirective* pithy = root->get("pithy_print");
if (!pithy) {
return SRS_STAGE_INGESTER_INTERVAL_MS;
}
pithy = pithy->get("ingester");
if (!pithy) {
return SRS_STAGE_INGESTER_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_hls() int SrsConfig::get_pithy_print_hls()
{ {
SrsConfDirective* pithy = root->get("pithy_print"); SrsConfDirective* pithy = root->get("pithy_print");
@ -1645,10 +1660,33 @@ string SrsConfig::get_ingest_ffmpeg(SrsConfDirective* ingest)
return conf->arg0(); return conf->arg0();
} }
string SrsConfig::get_ingest_input(SrsConfDirective* ingest) string SrsConfig::get_ingest_input_type(SrsConfDirective* ingest)
{ {
SrsConfDirective* conf = ingest->get("input"); SrsConfDirective* conf = ingest->get("input");
if (!conf) {
return SRS_INGEST_TYPE_FILE;
}
conf = conf->get("type");
if (!conf) {
return SRS_INGEST_TYPE_FILE;
}
return conf->arg0();
}
string SrsConfig::get_ingest_input_url(SrsConfDirective* ingest)
{
SrsConfDirective* conf = ingest->get("input");
if (!conf) {
return "";
}
conf = conf->get("url");
if (!conf) { if (!conf) {
return ""; return "";
} }

View file

@ -66,8 +66,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_STAGE_PUBLISH_USER_INTERVAL_MS 1100 #define SRS_STAGE_PUBLISH_USER_INTERVAL_MS 1100
#define SRS_STAGE_FORWARDER_INTERVAL_MS 2000 #define SRS_STAGE_FORWARDER_INTERVAL_MS 2000
#define SRS_STAGE_ENCODER_INTERVAL_MS 2000 #define SRS_STAGE_ENCODER_INTERVAL_MS 2000
#define SRS_STAGE_INGESTER_INTERVAL_MS 2000
#define SRS_STAGE_HLS_INTERVAL_MS 2000 #define SRS_STAGE_HLS_INTERVAL_MS 2000
#define SRS_INGEST_TYPE_FILE "file"
class SrsFileBuffer; class SrsFileBuffer;
class SrsConfDirective class SrsConfDirective
@ -134,6 +137,7 @@ public:
virtual int get_pithy_print_publish(); virtual int get_pithy_print_publish();
virtual int get_pithy_print_forwarder(); virtual int get_pithy_print_forwarder();
virtual int get_pithy_print_encoder(); virtual int get_pithy_print_encoder();
virtual int get_pithy_print_ingester();
virtual int get_pithy_print_hls(); virtual int get_pithy_print_hls();
virtual int get_pithy_print_play(); virtual int get_pithy_print_play();
// vhost section // vhost section
@ -190,7 +194,8 @@ public:
virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters); virtual void get_ingesters(std::string vhost, std::vector<SrsConfDirective*>& ingeters);
virtual bool get_ingest_enabled(SrsConfDirective* ingest); virtual bool get_ingest_enabled(SrsConfDirective* ingest);
virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest); virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest);
virtual std::string get_ingest_input(SrsConfDirective* ingest); virtual std::string get_ingest_input_type(SrsConfDirective* ingest);
virtual std::string get_ingest_input_url(SrsConfDirective* ingest);
// log section // log section
public: public:
virtual bool get_srs_log_tank_file(); virtual bool get_srs_log_tank_file();

View file

@ -99,13 +99,13 @@ int SrsEncoder::cycle()
// start all ffmpegs. // start all ffmpegs.
if ((ret = ffmpeg->start()) != ERROR_SUCCESS) { if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
srs_error("ffmpeg start failed. ret=%d", ret); srs_error("transcode ffmpeg start failed. ret=%d", ret);
return ret; return ret;
} }
// check ffmpeg status. // check ffmpeg status.
if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) { if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {
srs_error("ffmpeg cycle failed. ret=%d", ret); srs_error("transcode ffmpeg cycle failed. ret=%d", ret);
return ret; return ret;
} }
} }

View file

@ -73,6 +73,11 @@ SrsFFMPEG::~SrsFFMPEG()
stop(); stop();
} }
void SrsFFMPEG::set_iparams(string iparams)
{
_iparams = iparams;
}
string SrsFFMPEG::output() string SrsFFMPEG::output()
{ {
return _output; return _output;
@ -232,6 +237,11 @@ int SrsFFMPEG::start()
// the filename associated with the file being executed. // the filename associated with the file being executed.
params.push_back(ffmpeg); params.push_back(ffmpeg);
// input params
if (!_iparams.empty()) {
params.push_back(_iparams);
}
// input. // input.
params.push_back("-f"); params.push_back("-f");
params.push_back("flv"); params.push_back("flv");

View file

@ -51,6 +51,7 @@ private:
int log_fd; int log_fd;
private: private:
std::string ffmpeg; std::string ffmpeg;
std::string _iparams;
std::vector<std::string> vfilter; std::vector<std::string> vfilter;
std::string vcodec; std::string vcodec;
int vbitrate; int vbitrate;
@ -72,6 +73,7 @@ public:
SrsFFMPEG(std::string ffmpeg_bin); SrsFFMPEG(std::string ffmpeg_bin);
virtual ~SrsFFMPEG(); virtual ~SrsFFMPEG();
public: public:
virtual void set_iparams(std::string iparams);
virtual std::string output(); virtual std::string output();
public: public:
virtual int initialize(std::string in, std::string out, std::string log); virtual int initialize(std::string in, std::string out, std::string log);

View file

@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_app_ffmpeg.hpp> #include <srs_app_ffmpeg.hpp>
#include <srs_app_pithy_print.hpp>
// when error, ingester sleep for a while and retry. // when error, ingester sleep for a while and retry.
#define SRS_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) #define SRS_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
@ -37,6 +38,7 @@ SrsIngester::SrsIngester()
{ {
// TODO: FIXME: support reload. // TODO: FIXME: support reload.
pthread = new SrsThread(this, SRS_INGESTER_SLEEP_US); pthread = new SrsThread(this, SRS_INGESTER_SLEEP_US);
pithy_print = new SrsPithyPrint(SRS_STAGE_INGESTER);
} }
SrsIngester::~SrsIngester() SrsIngester::~SrsIngester()
@ -55,6 +57,17 @@ int SrsIngester::start()
return ret; return ret;
} }
// return for error or no engine.
if (ffmpegs.empty()) {
return ret;
}
// start thread to run all encoding engines.
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
return ret; return ret;
} }
@ -96,7 +109,7 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
_srs_config->get_transcode_engines(ingest, engines); _srs_config->get_transcode_engines(ingest, engines);
if (engines.empty()) { if (engines.empty()) {
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((ret = initialize_ffmpeg(ffmpeg, ingest, NULL)) != ERROR_SUCCESS) { if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) {
srs_freep(ffmpeg); srs_freep(ffmpeg);
if (ret != ERROR_ENCODER_LOOP) { if (ret != ERROR_ENCODER_LOOP) {
srs_error("invalid ingest engine. ret=%d", ret); srs_error("invalid ingest engine. ret=%d", ret);
@ -112,7 +125,7 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
for (int i = 0; i < (int)engines.size(); i++) { for (int i = 0; i < (int)engines.size(); i++) {
SrsConfDirective* engine = engines[i]; SrsConfDirective* engine = engines[i];
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((ret = initialize_ffmpeg(ffmpeg, ingest, engine)) != ERROR_SUCCESS) { if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, engine)) != ERROR_SUCCESS) {
srs_freep(ffmpeg); srs_freep(ffmpeg);
if (ret != ERROR_ENCODER_LOOP) { if (ret != ERROR_ENCODER_LOOP) {
srs_error("invalid ingest engine: %s %s", ingest->arg0().c_str(), engine->arg0().c_str()); srs_error("invalid ingest engine: %s %s", ingest->arg0().c_str(), engine->arg0().c_str());
@ -128,11 +141,35 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest
void SrsIngester::stop() void SrsIngester::stop()
{ {
pthread->stop();
clear_engines();
} }
int SrsIngester::cycle() int SrsIngester::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
std::vector<SrsFFMPEG*>::iterator it;
for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
// start all ffmpegs.
if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
srs_error("ingest ffmpeg start failed. ret=%d", ret);
return ret;
}
// check ffmpeg status.
if ((ret = ffmpeg->cycle()) != ERROR_SUCCESS) {
srs_error("ingest ffmpeg cycle failed. ret=%d", ret);
return ret;
}
}
// pithy print
ingester();
pithy_print->elapse(SRS_INGESTER_SLEEP_US / 1000);
return ret; return ret;
} }
@ -170,21 +207,109 @@ int SrsIngester::parse()
return ret; return ret;
} }
int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine) int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
std::string input = _srs_config->get_ingest_input(ingest); SrsConfDirective* listen = _srs_config->get_listen();
if (input.empty()) { srs_assert(listen->args.size() > 0);
ret = ERROR_ENCODER_NO_INPUT; std::string port = listen->arg0();
srs_trace("empty ingest intput. ret=%d", ret);
std::string output = _srs_config->get_engine_output(engine);
// output stream, to other/self server
// ie. rtmp://127.0.0.1:1935/live/livestream_sd
output = srs_string_replace(output, "[vhost]", vhost->arg0());
output = srs_string_replace(output, "[port]", port);
if (output.empty()) {
ret = ERROR_ENCODER_NO_OUTPUT;
srs_trace("empty ingest output url. ret=%d", ret);
return ret; return ret;
} }
// find the app and stream in rtmp url
std::string url = output;
std::string app, stream;
size_t pos = std::string::npos;
if ((pos = url.rfind("/")) != std::string::npos) {
stream = url.substr(pos + 1);
url = url.substr(0, pos);
}
if ((pos = url.rfind("/")) != std::string::npos) {
app = url.substr(pos + 1);
url = url.substr(0, pos);
}
if ((pos = app.rfind("?")) != std::string::npos) {
app = app.substr(0, pos);
}
std::string log_file;
// write ffmpeg info to log file.
log_file = _srs_config->get_ffmpeg_log_dir();
log_file += "/";
log_file += "ingest";
log_file += "-";
log_file += vhost->arg0();
log_file += "-";
log_file += app;
log_file += "-";
log_file += stream;
log_file += ".log";
// stream name: vhost/app/stream for print
input_stream_name = vhost->arg0();
input_stream_name += "/";
input_stream_name += app;
input_stream_name += "/";
input_stream_name += stream;
// input
std::string input_type = _srs_config->get_ingest_input_type(ingest);
if (input_type.empty()) {
ret = ERROR_ENCODER_NO_INPUT;
srs_trace("empty ingest intput type. ret=%d", ret);
return ret;
}
if (input_type == SRS_INGEST_TYPE_FILE) {
std::string input_url = _srs_config->get_ingest_input_url(ingest);
if (input_url.empty()) {
ret = ERROR_ENCODER_NO_INPUT;
srs_trace("empty ingest intput url. ret=%d", ret);
return ret;
}
// for file, set re.
ffmpeg->set_iparams("-re");
if ((ret = ffmpeg->initialize(input_url, output, log_file)) != ERROR_SUCCESS) {
return ret;
}
} else {
ret = ERROR_ENCODER_INPUT_TYPE;
srs_error("invalid ingest type=%s, ret=%d", input_type.c_str(), ret);
}
if (!engine || !_srs_config->get_engine_enabled(engine)) { if (!engine || !_srs_config->get_engine_enabled(engine)) {
if ((ret = ffmpeg->initialize_copy()) != ERROR_SUCCESS) {
return ret;
}
} else {
if ((ret = ffmpeg->initialize_transcode(engine)) != ERROR_SUCCESS) {
return ret;
}
} }
return ret; return ret;
} }
void SrsIngester::ingester()
{
// reportable
if (pithy_print->can_print()) {
// TODO: FIXME: show more info.
srs_trace("-> time=%"PRId64", ingesters=%d, input=%s",
pithy_print->get_age(), (int)ffmpegs.size(), input_stream_name.c_str());
}
}
#endif #endif

View file

@ -37,6 +37,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsFFMPEG; class SrsFFMPEG;
class SrsConfDirective; class SrsConfDirective;
class SrsPithyPrint;
/** /**
* ingest file/stream/device, * ingest file/stream/device,
@ -46,9 +47,11 @@ class SrsConfDirective;
class SrsIngester : public ISrsThreadHandler class SrsIngester : public ISrsThreadHandler
{ {
private: private:
std::string input_stream_name;
std::vector<SrsFFMPEG*> ffmpegs; std::vector<SrsFFMPEG*> ffmpegs;
private: private:
SrsThread* pthread; SrsThread* pthread;
SrsPithyPrint* pithy_print;
public: public:
SrsIngester(); SrsIngester();
virtual ~SrsIngester(); virtual ~SrsIngester();
@ -64,7 +67,8 @@ private:
virtual int parse(); virtual int parse();
virtual int parse_ingesters(SrsConfDirective* vhost); virtual int parse_ingesters(SrsConfDirective* vhost);
virtual int parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest); virtual int parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest);
virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine); virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine);
virtual void ingester();
}; };
#endif #endif

View file

@ -71,6 +71,10 @@ struct SrsStageInfo : public ISrsReloadHandler
pithy_print_time_ms = _srs_config->get_pithy_print_encoder(); pithy_print_time_ms = _srs_config->get_pithy_print_encoder();
break; break;
} }
case SRS_STAGE_INGESTER: {
pithy_print_time_ms = _srs_config->get_pithy_print_ingester();
break;
}
case SRS_STAGE_HLS: { case SRS_STAGE_HLS: {
pithy_print_time_ms = _srs_config->get_pithy_print_hls(); pithy_print_time_ms = _srs_config->get_pithy_print_hls();
break; break;
@ -108,7 +112,8 @@ int SrsPithyPrint::enter_stage()
std::map<int, SrsStageInfo*>::iterator it = _srs_stages.find(stage_id); std::map<int, SrsStageInfo*>::iterator it = _srs_stages.find(stage_id);
if (it == _srs_stages.end()) { if (it == _srs_stages.end()) {
stage = _srs_stages[stage_id] = new SrsStageInfo(stage_id); stage = new SrsStageInfo(stage_id);
_srs_stages[stage_id] = stage;
} else { } else {
stage = it->second; stage = it->second;
} }

View file

@ -40,6 +40,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_STAGE_ENCODER 4 #define SRS_STAGE_ENCODER 4
// the pithy stage for all hls. // the pithy stage for all hls.
#define SRS_STAGE_HLS 5 #define SRS_STAGE_HLS 5
// the pithy stage for all ingesters.
#define SRS_STAGE_INGESTER 6
/** /**
* the stage is used for a collection of object to do print, * the stage is used for a collection of object to do print,

View file

@ -164,13 +164,13 @@ SrsServer::SrsServer()
_srs_config->subscribe(this); _srs_config->subscribe(this);
#ifdef SRS_HTTP_API #ifdef SRS_HTTP_API
http_api_handler = SrsHttpHandler::create_http_api(); http_api_handler = NULL;
#endif #endif
#ifdef SRS_HTTP_SERVER #ifdef SRS_HTTP_SERVER
http_stream_handler = SrsHttpHandler::create_http_stream(); http_stream_handler = NULL;
#endif #endif
#ifdef SRS_INGEST #ifdef SRS_INGEST
ingester = new SrsIngester(); ingester = NULL;
#endif #endif
} }
@ -205,6 +205,19 @@ int SrsServer::initialize()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
#ifdef SRS_HTTP_API
srs_assert(!http_api_handler);
http_api_handler = SrsHttpHandler::create_http_api();
#endif
#ifdef SRS_HTTP_SERVER
srs_assert(!http_stream_handler);
http_stream_handler = SrsHttpHandler::create_http_stream();
#endif
#ifdef SRS_INGEST
srs_assert(!ingester);
ingester = new SrsIngester();
#endif
#ifdef SRS_HTTP_API #ifdef SRS_HTTP_API
if ((ret = http_api_handler->initialize()) != ERROR_SUCCESS) { if ((ret = http_api_handler->initialize()) != ERROR_SUCCESS) {
return ret; return ret;

2
trunk/src/core/srs_core.hpp Normal file → Executable file
View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR "0" #define VERSION_MAJOR "0"
#define VERSION_MINOR "9" #define VERSION_MINOR "9"
#define VERSION_REVISION "52" #define VERSION_REVISION "53"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info. // server info.
#define RTMP_SIG_SRS_KEY "srs" #define RTMP_SIG_SRS_KEY "srs"

View file

@ -155,6 +155,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_ENCODER_DUP2 716 #define ERROR_ENCODER_DUP2 716
#define ERROR_ENCODER_PARSE 717 #define ERROR_ENCODER_PARSE 717
#define ERROR_ENCODER_NO_INPUT 718 #define ERROR_ENCODER_NO_INPUT 718
#define ERROR_ENCODER_NO_OUTPUT 719
#define ERROR_ENCODER_INPUT_TYPE 720
#define ERROR_HTTP_PARSE_URI 800 #define ERROR_HTTP_PARSE_URI 800
#define ERROR_HTTP_DATA_INVLIAD 801 #define ERROR_HTTP_DATA_INVLIAD 801