From a28eec89b0a530965c1ff6e9ea665f2e8d93f6ae Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 21 Apr 2015 12:53:45 +0800 Subject: [PATCH] support ingest hls with m3u8 in m3u8. --- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/kernel/srs_kernel_utility.cpp | 21 ++ trunk/src/kernel/srs_kernel_utility.hpp | 4 + trunk/src/main/srs_main_ingest_hls.cpp | 359 ++++++++++++++++++------ trunk/src/protocol/srs_raw_avc.cpp | 9 +- 5 files changed, 311 insertions(+), 83 deletions(-) diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 1b142dae7..65d4c56c0 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -254,6 +254,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_RESPONSE_EOF 4025 #define ERROR_HTTP_INVALID_CHUNK_HEADER 4026 #define ERROR_AVC_NALU_UEV 4027 +#define ERROR_AAC_BYTES_INVALID 4028 /////////////////////////////////////////////////////// // user-define error. diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index d0daaef50..bfbaf3a3f 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -278,6 +278,11 @@ bool srs_string_starts_with(string str, string flag) return str.find(flag) == 0; } +bool srs_string_contains(string str, string flag) +{ + return str.find(flag) != string::npos; +} + int srs_do_create_dir_recursively(string dir) { int ret = ERROR_SUCCESS; @@ -356,6 +361,22 @@ string srs_path_dirname(string path) return dirname; } +string srs_path_basename(string path) +{ + std::string dirname = path; + size_t pos = string::npos; + + if ((pos = dirname.rfind("/")) != string::npos) { + // the basename("/") is "/" + if (dirname.length() == 1) { + return dirname; + } + dirname = dirname.substr(pos + 1); + } + + return dirname; +} + bool srs_avc_startswith_annexb(SrsStream* stream, int* pnb_start_code) { char* bytes = stream->data() + stream->pos(); diff --git a/trunk/src/kernel/srs_kernel_utility.hpp b/trunk/src/kernel/srs_kernel_utility.hpp index 9f333ef94..afae06141 100644 --- a/trunk/src/kernel/srs_kernel_utility.hpp +++ b/trunk/src/kernel/srs_kernel_utility.hpp @@ -67,6 +67,8 @@ extern std::string srs_string_remove(std::string str, std::string remove_chars); extern bool srs_string_ends_with(std::string str, std::string flag); // whether string starts with extern bool srs_string_starts_with(std::string str, std::string flag); +// whether string contains with +extern bool srs_string_contains(std::string str, std::string flag); // create dir recursively extern int srs_create_dir_recursively(std::string dir); @@ -75,6 +77,8 @@ extern int srs_create_dir_recursively(std::string dir); extern bool srs_path_exists(std::string path); // get the dirname of path extern std::string srs_path_dirname(std::string path); +// get the basename of path +extern std::string srs_path_basename(std::string path); /** * whether stream starts with the avc NALU in "AnnexB" diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index ad12411c2..2d2fd414e 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -128,6 +128,16 @@ int main(int argc, char** argv) return proxy_hls2rtmp(in_hls_url, out_rtmp_url); } +class ISrsAacHandler +{ +public: + /** + * handle the aac frame, which in ADTS format(starts with FFFx). + * @param duration the duration in seconds of frames. +*/ +virtual int on_aac_frame(char* frame, int frame_size, double duration) = 0; +}; + // the context to ingest hls stream. class SrsIngestSrsInput { @@ -185,8 +195,17 @@ public: /** * parse the ts and use hanler to process the message. */ - virtual int parse(ISrsTsHandler* handler); + virtual int parse(ISrsTsHandler* ts, ISrsAacHandler* aac); private: + /** + * parse the ts pieces body. + */ + virtual int parseAac(ISrsAacHandler* handler, char* body, int nb_body, double duration); + virtual int parseTs(ISrsTsHandler* handler, char* body, int nb_body); + /** + * parse the m3u8 specified by url. + */ + virtual int parseM3u8(SrsHttpUri* url, double& td, double& duration); /** * find the ts piece by its url. */ @@ -215,17 +234,164 @@ int SrsIngestSrsInput::connect() st_usleep((next_connect_time - now) * 1000); } - SrsHttpClient client; - srs_trace("parse input hls %s", in_hls->get_url()); + // set all ts to dirty. + dirty_all_ts(); - if ((ret = client.initialize(in_hls->get_host(), in_hls->get_port())) != ERROR_SUCCESS) { + bool fresh_m3u8 = pieces.empty(); + double td = 0.0; + double duration = 0.0; + if ((ret = parseM3u8(in_hls, td, duration)) != ERROR_SUCCESS) { + return ret; + } + + // fetch all ts. + fetch_all_ts(fresh_m3u8); + + // remove all dirty ts. + remove_dirty(); + + srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size()); + + return ret; +} + +int SrsIngestSrsInput::parse(ISrsTsHandler* ts, ISrsAacHandler* aac) +{ + int ret = ERROR_SUCCESS; + + for (int i = 0; i < (int)pieces.size(); i++) { + SrsTsPiece* tp = pieces.at(i); + + // sent only once. + if (tp->sent) { + continue; + } + tp->sent = true; + + if (tp->body.empty()) { + continue; + } + + srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration); + + if (srs_string_ends_with(tp->url, ".ts")) { + if ((ret = parseTs(ts, (char*)tp->body.data(), (int)tp->body.length())) != ERROR_SUCCESS) { + return ret; + } + } else if (srs_string_ends_with(tp->url, ".aac")) { + if ((ret = parseAac(aac, (char*)tp->body.data(), (int)tp->body.length(), tp->duration)) != ERROR_SUCCESS) { + return ret; + } + } else { + srs_warn("ignore unkown piece %s", tp->url.c_str()); + } + } + + return ret; +} + +int SrsIngestSrsInput::parseTs(ISrsTsHandler* handler, char* body, int nb_body) +{ + int ret = ERROR_SUCCESS; + + // use stream to parse ts packet. + int nb_packet = (int)nb_body / SRS_TS_PACKET_SIZE; + for (int i = 0; i < nb_packet; i++) { + char* p = (char*)body + (i * SRS_TS_PACKET_SIZE); + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { + return ret; + } + + // process each ts packet + if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) { + // when peer closed, must interrupt parse and reconnect. + if (srs_is_client_gracefully_close(ret)) { + srs_warn("interrupt parse for peer closed. ret=%d", ret); + return ret; + } + + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); + continue; + } + srs_info("mpegts: parse ts packet completed"); + } + srs_info("mpegts: parse udp packet completed"); + + return ret; +} + +int SrsIngestSrsInput::parseAac(ISrsAacHandler* handler, char* body, int nb_body, double duration) +{ + int ret = ERROR_SUCCESS; + + if ((ret = stream->initialize(body, nb_body)) != ERROR_SUCCESS) { + return ret; + } + + // atleast 2bytes. + if (!stream->require(3)) { + ret = ERROR_AAC_BYTES_INVALID; + srs_error("invalid aac, atleast 3bytes. ret=%d", ret); + return ret; + } + + u_int8_t id0 = (u_int8_t)body[0]; + u_int8_t id1 = (u_int8_t)body[1]; + u_int8_t id2 = (u_int8_t)body[2]; + + // skip ID3. + if (id0 == 0x49 && id1 == 0x44 && id2 == 0x33) { + /*char id3[] = { + (char)0x49, (char)0x44, (char)0x33, // ID3 + (char)0x03, (char)0x00, // version + (char)0x00, // flags + (char)0x00, (char)0x00, (char)0x00, (char)0x0a, // size + + (char)0x00, (char)0x00, (char)0x00, (char)0x00, // FrameID + (char)0x00, (char)0x00, (char)0x00, (char)0x00, // FrameSize + (char)0x00, (char)0x00 // Flags + };*/ + // atleast 10 bytes. + if (!stream->require(10)) { + ret = ERROR_AAC_BYTES_INVALID; + srs_error("invalid aac ID3, atleast 10bytes. ret=%d", ret); + return ret; + } + + // ignore ID3 + version + flag. + stream->skip(6); + // read the size of ID3. + u_int32_t nb_id3 = stream->read_4bytes(); + + // read body of ID3 + if (!stream->require(nb_id3)) { + ret = ERROR_AAC_BYTES_INVALID; + srs_error("invalid aac ID3 body, required %dbytes. ret=%d", nb_id3, ret); + return ret; + } + stream->skip(nb_id3); + } + + char* frame = body + stream->pos(); + int frame_size = nb_body - stream->pos(); + return handler->on_aac_frame(frame, frame_size, duration); +} + +int SrsIngestSrsInput::parseM3u8(SrsHttpUri* url, double& td, double& duration) +{ + int ret = ERROR_SUCCESS; + + SrsHttpClient client; + srs_trace("parse input hls %s", url->get_url()); + + if ((ret = client.initialize(url->get_host(), url->get_port())) != ERROR_SUCCESS) { srs_error("connect to server failed. ret=%d", ret); return ret; } SrsHttpMessage* msg = NULL; - if ((ret = client.get(in_hls->get_path(), "", &msg)) != ERROR_SUCCESS) { - srs_error("HTTP GET %s failed. ret=%d", in_hls->get_url(), ret); + if ((ret = client.get(url->get_path(), "", &msg)) != ERROR_SUCCESS) { + srs_error("HTTP GET %s failed. ret=%d", url->get_url(), ret); return ret; } @@ -243,13 +409,7 @@ int SrsIngestSrsInput::connect() return ret; } - // set all ts to dirty. - dirty_all_ts(); - std::string ptl; - double td = 0.0; - double duration = 0.0; - bool fresh_m3u8 = pieces.empty(); while (!body.empty()) { size_t pos = string::npos; @@ -293,6 +453,28 @@ int SrsIngestSrsInput::connect() break; } + // #EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=73207,CODECS="mp4a.40.2" + if (srs_string_starts_with(line, "#EXT-X-STREAM-INF:")) { + if ((pos = body.find("\n")) == string::npos) { + srs_warn("m3u8 entry unexpected eof, inf=%s", line.c_str()); + break; + } + + std::string m3u8_url = body.substr(0, pos); + body = body.substr(pos + 1); + + if (!srs_string_starts_with(m3u8_url, "http://")) { + m3u8_url = srs_path_dirname(url->get_url()) + "/" + m3u8_url; + } + srs_trace("parse sub m3u8, url=%s", m3u8_url.c_str()); + + if ((ret = url->initialize(m3u8_url)) != ERROR_SUCCESS) { + return ret; + } + + return parseM3u8(url, td, duration); + } + // #EXTINF:11.401, // livestream-5.ts // parse each ts entry, expect current line is inf. @@ -330,60 +512,6 @@ int SrsIngestSrsInput::connect() } } - // fetch all ts. - fetch_all_ts(fresh_m3u8); - - // remove all dirty ts. - remove_dirty(); - - srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size()); - - return ret; -} - -int SrsIngestSrsInput::parse(ISrsTsHandler* handler) -{ - int ret = ERROR_SUCCESS; - - for (int i = 0; i < (int)pieces.size(); i++) { - SrsTsPiece* tp = pieces.at(i); - - // sent only once. - if (tp->sent) { - continue; - } - tp->sent = true; - - if (tp->body.empty()) { - continue; - } - - srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration); - - // use stream to parse ts packet. - int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE; - for (int i = 0; i < nb_packet; i++) { - char* p = (char*)tp->body.data() + (i * SRS_TS_PACKET_SIZE); - if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { - return ret; - } - - // process each ts packet - if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) { - // when peer closed, must interrupt parse and reconnect. - if (srs_is_client_gracefully_close(ret)) { - srs_warn("interrupt parse for peer closed. ret=%d", ret); - return ret; - } - - srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); - continue; - } - srs_info("mpegts: parse ts packet completed"); - } - srs_info("mpegts: parse udp packet completed"); - } - return ret; } @@ -464,17 +592,11 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8) return ret; } - size_t pos = string::npos; - SrsHttpClient client; std::string ts_url = url; if (!srs_string_starts_with(ts_url, "http://")) { - std::string baseurl = m3u8; - if ((pos = m3u8.rfind("/")) != string::npos) { - baseurl = m3u8.substr(0, pos); - } - ts_url = baseurl + "/" + url; + ts_url = srs_path_dirname(m3u8) + "/" + url; } SrsHttpUri uri; @@ -507,13 +629,14 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8) } // the context to output to rtmp server -class SrsIngestSrsOutput : public ISrsTsHandler +class SrsIngestSrsOutput : virtual public ISrsTsHandler, virtual public ISrsAacHandler { private: SrsHttpUri* out_rtmp; private: bool disconnected; std::multimap queue; + int64_t raw_aac_dts; private: SrsRequest* req; st_netfd_t stfd; @@ -534,6 +657,7 @@ public: SrsIngestSrsOutput(SrsHttpUri* rtmp) { out_rtmp = rtmp; disconnected = false; + raw_aac_dts = 0; req = NULL; io = NULL; @@ -563,7 +687,11 @@ public: // interface ISrsTsHandler public: virtual int on_ts_message(SrsTsMessage* msg); +// interface IAacHandler +public: + virtual int on_aac_frame(char* frame, int frame_size, double duration); private: + virtual int do_on_aac_frame(SrsStream* avs, double duration); virtual int parse_message_queue(); virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs); virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts); @@ -661,6 +789,76 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) return ret; } +int SrsIngestSrsOutput::on_aac_frame(char* frame, int frame_size, double duration) +{ + int ret = ERROR_SUCCESS; + + srs_trace("handle aac frames, size=%dB, duration=%.2f, dts=%"PRId64, frame_size, duration, raw_aac_dts); + + SrsStream stream; + if ((ret = stream.initialize(frame, frame_size)) != ERROR_SUCCESS) { + return ret; + } + + return do_on_aac_frame(&stream, duration); +} + +int SrsIngestSrsOutput::do_on_aac_frame(SrsStream* avs, double duration) +{ + int ret = ERROR_SUCCESS; + + // ts tbn to flv tbn. + u_int32_t dts = (u_int32_t)raw_aac_dts; + raw_aac_dts += (int64_t)(duration * 1000); + + // got the next msg to calc the delta duration for each audio. + u_int32_t max_dts = dts + (u_int32_t)(duration * 1000); + + // send each frame. + while (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + SrsRawAacStreamCodec codec; + if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) { + return ret; + } + + // ignore invalid frame, + // * atleast 1bytes for aac to decode the data. + if (frame_size <= 0) { + continue; + } + srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts); + + // generate sh. + if (aac_specific_config.empty()) { + std::string sh; + if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) { + return ret; + } + aac_specific_config = sh; + + codec.aac_packet_type = 0; + + if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) { + return ret; + } + } + + // audio raw data. + codec.aac_packet_type = 1; + if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) { + return ret; + } + + // calc the delta of dts, when previous frame output. + u_int32_t delta = (duration * 1000) / (avs->size() / frame_size); + dts = (u_int32_t)(srs_min(max_dts, dts + delta)); + } + + return ret; +} + int SrsIngestSrsOutput::parse_message_queue() { int ret = ERROR_SUCCESS; @@ -914,7 +1112,7 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs) // ts tbn to flv tbn. u_int32_t dts = (u_int32_t)(msg->dts / 90); - // got the next video to calc the delta duration for each audio. + // got the next msg to calc the delta duration for each audio. u_int32_t duration = 0; if (!queue.empty()) { SrsTsMessage* nm = queue.begin()->second; @@ -992,6 +1190,8 @@ int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* } srs_assert(msg); + srs_info("RTMP type=%d, dts=%d, size=%d", type, timestamp, size); + // send out encoded msg. if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { return ret; @@ -1016,13 +1216,12 @@ int SrsIngestSrsOutput::connect() if (!req) { req = new SrsRequest(); - size_t pos = string::npos; string uri = req->tcUrl = out_rtmp->get_url(); // tcUrl, stream - if ((pos = uri.rfind("/")) != string::npos) { - req->stream = uri.substr(pos + 1); - req->tcUrl = uri = uri.substr(0, pos); + if (srs_string_contains(uri, "/")) { + req->stream = srs_path_basename(uri); + req->tcUrl = uri = srs_path_dirname(uri); } srs_discovery_tc_url(req->tcUrl, @@ -1155,7 +1354,7 @@ public: return ret; } - if ((ret = ic->parse(oc)) != ERROR_SUCCESS) { + if ((ret = ic->parse(oc, oc)) != ERROR_SUCCESS) { srs_warn("proxy ts to rtmp failed. ret=%d", ret); return ret; } diff --git a/trunk/src/protocol/srs_raw_avc.cpp b/trunk/src/protocol/srs_raw_avc.cpp index 4e95db40f..79f17255f 100644 --- a/trunk/src/protocol/srs_raw_avc.cpp +++ b/trunk/src/protocol/srs_raw_avc.cpp @@ -362,9 +362,12 @@ int SrsRawAacStream::adts_demux(SrsStream* stream, char** pframe, int* pnb_frame * and set to ‘0’ if the audio data are MPEG-4. See also ISO/IEC 11172-3, subclause 2.4.2.3. */ if (id != 0x01) { - ret = ERROR_ADTS_ID_NOT_AAC; - srs_warn("adts: id must be 1(aac), actual 0(mp4a). ret=%d", ret); - return ret; + srs_info("adts: id must be 1(aac), actual 0(mp4a). ret=%d", ret); + + // well, some system always use 0, but actually is aac format. + // for example, houjian vod ts always set the aac id to 0, actually 1. + // we just ignore it, and alwyas use 1(aac) to demux. + id = 0x01; } int16_t sfiv = stream->read_2bytes();