/** * The MIT License (MIT) * * Copyright (c) 2013-2020 Winlin * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include #include using namespace std; #include #include #include #include #include #include #include #include // for encoder to detect the dead loop static std::vector _transcoded_url; SrsEncoder::SrsEncoder() { trd = new SrsDummyCoroutine(); pprint = SrsPithyPrint::create_encoder(); } SrsEncoder::~SrsEncoder() { on_unpublish(); srs_freep(trd); srs_freep(pprint); } srs_error_t SrsEncoder::on_publish(SrsRequest* req) { srs_error_t err = srs_success; // parse the transcode engines for vhost and app and stream. err = parse_scope_engines(req); // ignore the loop encoder // if got a loop, donot transcode the whole stream. if (srs_error_code(err) == ERROR_ENCODER_LOOP) { clear_engines(); srs_error_reset(err); } // return for error or no engine. if (err != srs_success || ffmpegs.empty()) { return err; } // start thread to run all encoding engines. srs_freep(trd); trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id()); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start encoder"); } return err; } void SrsEncoder::on_unpublish() { trd->stop(); clear_engines(); } // when error, encoder sleep for a while and retry. #define SRS_RTMP_ENCODER_CIMS (3 * SRS_UTIME_SECONDS) srs_error_t SrsEncoder::cycle() { srs_error_t err = srs_success; while (true) { // We always check status first. // @see https://github.com/ossrs/srs/issues/1634#issuecomment-597571561 if ((err = trd->pull()) != srs_success) { err = srs_error_wrap(err, "encoder"); break; } if ((err = do_cycle()) != srs_success) { srs_warn("Encoder: Ignore error, %s", srs_error_desc(err).c_str()); srs_error_reset(err); } srs_usleep(SRS_RTMP_ENCODER_CIMS); } // kill ffmpeg when finished and it alive std::vector::iterator it; for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { SrsFFMPEG* ffmpeg = *it; ffmpeg->stop(); } return err; } srs_error_t SrsEncoder::do_cycle() { srs_error_t err = srs_success; std::vector::iterator it; for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { SrsFFMPEG* ffmpeg = *it; // start all ffmpegs. if ((err = ffmpeg->start()) != srs_success) { return srs_error_wrap(err, "ffmpeg start"); } // check ffmpeg status. if ((err = ffmpeg->cycle()) != srs_success) { return srs_error_wrap(err, "ffmpeg cycle"); } } // pithy print show_encode_log_message(); return err; } void SrsEncoder::clear_engines() { std::vector::iterator it; for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { SrsFFMPEG* ffmpeg = *it; std::string output = ffmpeg->output(); std::vector::iterator tu_it; tu_it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output); if (tu_it != _transcoded_url.end()) { _transcoded_url.erase(tu_it); } srs_freep(ffmpeg); } ffmpegs.clear(); } SrsFFMPEG* SrsEncoder::at(int index) { return ffmpegs[index]; } srs_error_t SrsEncoder::parse_scope_engines(SrsRequest* req) { srs_error_t err = srs_success; // parse all transcode engines. SrsConfDirective* conf = NULL; // parse vhost scope engines std::string scope = ""; if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) { if ((err = parse_ffmpeg(req, conf)) != srs_success) { return srs_error_wrap(err, "parse ffmpeg"); } } // parse app scope engines scope = req->app; if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) { if ((err = parse_ffmpeg(req, conf)) != srs_success) { return srs_error_wrap(err, "parse ffmpeg"); } } // parse stream scope engines scope += "/"; scope += req->stream; if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) { if ((err = parse_ffmpeg(req, conf)) != srs_success) { return srs_error_wrap(err, "parse ffmpeg"); } } return err; } srs_error_t SrsEncoder::parse_ffmpeg(SrsRequest* req, SrsConfDirective* conf) { srs_error_t err = srs_success; srs_assert(conf); // enabled if (!_srs_config->get_transcode_enabled(conf)) { srs_trace("ignore the disabled transcode: %s", conf->arg0().c_str()); return err; } // ffmpeg std::string ffmpeg_bin = _srs_config->get_transcode_ffmpeg(conf); if (ffmpeg_bin.empty()) { srs_trace("ignore the empty ffmpeg transcode: %s", conf->arg0().c_str()); return err; } // get all engines. std::vector engines = _srs_config->get_transcode_engines(conf); if (engines.empty()) { srs_trace("ignore the empty transcode engine: %s", conf->arg0().c_str()); return err; } // create engine for (int i = 0; i < (int)engines.size(); i++) { SrsConfDirective* engine = engines[i]; if (!_srs_config->get_engine_enabled(engine)) { srs_trace("ignore the diabled transcode engine: %s %s", conf->arg0().c_str(), engine->arg0().c_str()); continue; } SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); if ((err = initialize_ffmpeg(ffmpeg, req, engine)) != srs_success) { srs_freep(ffmpeg); return srs_error_wrap(err, "init ffmpeg"); } ffmpegs.push_back(ffmpeg); } return err; } srs_error_t SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDirective* engine) { srs_error_t err = srs_success; std::string input; // input stream, from local. // ie. rtmp://localhost:1935/live/livestream input = "rtmp://"; input += SRS_CONSTS_LOCALHOST; input += ":"; input += srs_int2str(req->port); input += "/"; input += req->app; input += "?vhost="; input += req->vhost; input += "/"; input += req->stream; // stream name: vhost/app/stream for print input_stream_name = req->vhost; input_stream_name += "/"; input_stream_name += req->app; input_stream_name += "/"; input_stream_name += req->stream; std::string output = _srs_config->get_engine_output(engine); // output stream, to other/self server // ie. rtmp://localhost:1935/live/livestream_sd output = srs_string_replace(output, "[vhost]", req->vhost); output = srs_string_replace(output, "[port]", srs_int2str(req->port)); output = srs_string_replace(output, "[app]", req->app); output = srs_string_replace(output, "[stream]", req->stream); output = srs_string_replace(output, "[param]", req->param); output = srs_string_replace(output, "[engine]", engine->arg0()); output = srs_path_build_timestamp(output); std::string log_file = SRS_CONSTS_NULL_FILE; // disabled // write ffmpeg info to log file. if (_srs_config->get_ff_log_enabled()) { log_file = _srs_config->get_ff_log_dir(); log_file += "/"; log_file += "ffmpeg-encoder"; log_file += "-"; log_file += req->vhost; log_file += "-"; log_file += req->app; log_file += "-"; log_file += req->stream; if (!engine->args.empty()) { log_file += "-"; log_file += engine->arg0(); } log_file += ".log"; } // important: loop check, donot transcode again. std::vector::iterator it; it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input); if (it != _transcoded_url.end()) { return srs_error_new(ERROR_ENCODER_LOOP, "detect a loop cycle, input=%s, output=%s", input.c_str(), output.c_str()); } _transcoded_url.push_back(output); if ((err = ffmpeg->initialize(input, output, log_file)) != srs_success) { return srs_error_wrap(err, "init ffmpeg"); } if ((err = ffmpeg->initialize_transcode(engine)) != srs_success) { return srs_error_wrap(err, "init transcode"); } return err; } void SrsEncoder::show_encode_log_message() { pprint->elapse(); // reportable if (pprint->can_print()) { // TODO: FIXME: show more info. srs_trace("-> " SRS_CONSTS_LOG_ENCODER " time=%" PRId64 ", encoders=%d, input=%s", pprint->age(), (int)ffmpegs.size(), input_stream_name.c_str()); } }