diff --git a/trunk/src/kernel/srs_kernel_ts.cpp b/trunk/src/kernel/srs_kernel_ts.cpp index bc71c80d7..7c4254d55 100644 --- a/trunk/src/kernel/srs_kernel_ts.cpp +++ b/trunk/src/kernel/srs_kernel_ts.cpp @@ -169,6 +169,23 @@ int SrsTsMessage::stream_number() return -1; } +SrsTsMessage* SrsTsMessage::detach() +{ + // @remark the packet cannot be used, but channel is ok. + SrsTsMessage* cp = new SrsTsMessage(channel, NULL); + cp->start_pts = start_pts; + cp->write_pcr = write_pcr; + cp->is_discontinuity = is_discontinuity; + cp->dts = dts; + cp->pts = pts; + cp->sid = sid; + cp->PES_packet_length = PES_packet_length; + cp->continuity_counter = continuity_counter; + cp->payload = payload; + payload = NULL; + return cp; +} + ISrsTsHandler::ISrsTsHandler() { } diff --git a/trunk/src/kernel/srs_kernel_ts.hpp b/trunk/src/kernel/srs_kernel_ts.hpp index 2d5b14887..6b8c9d1a8 100644 --- a/trunk/src/kernel/srs_kernel_ts.hpp +++ b/trunk/src/kernel/srs_kernel_ts.hpp @@ -309,6 +309,13 @@ public: * @return the stream number for audio/video; otherwise, -1. */ virtual int stream_number(); +public: + /** + * detach the ts message, + * for user maybe need to parse the message by queue. + * @remark we always use the payload of original message. + */ + virtual SrsTsMessage* detach(); }; /** diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index 9a1876db1..ad12411c2 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -23,8 +23,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include #include #include +#include using namespace std; #include @@ -148,7 +150,7 @@ private: dirty = false; } - int fetch(std::string m3u8, SrsHttpClient* client); + int fetch(std::string m3u8); }; private: SrsHttpUri* in_hls; @@ -196,7 +198,7 @@ private: /** * fetch all ts body. */ - virtual void fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client); + virtual void fetch_all_ts(bool fresh_m3u8); /** * remove all ts which is dirty. */ @@ -209,6 +211,7 @@ int SrsIngestSrsInput::connect() int64_t now = srs_update_system_time_ms(); if (now < next_connect_time) { + srs_trace("input hls wait for %dms", next_connect_time - now); st_usleep((next_connect_time - now) * 1000); } @@ -328,7 +331,7 @@ int SrsIngestSrsInput::connect() } // fetch all ts. - fetch_all_ts(fresh_m3u8, &client); + fetch_all_ts(fresh_m3u8); // remove all dirty ts. remove_dirty(); @@ -344,12 +347,19 @@ int SrsIngestSrsInput::parse(ISrsTsHandler* handler) for (int i = 0; i < (int)pieces.size(); i++) { SrsTsPiece* tp = pieces.at(i); + + // sent only once. + if (tp->sent) { + continue; + } tp->sent = true; if (tp->body.empty()) { continue; } + srs_trace("proxy the ts to rtmp, ts=%s, duration=%.2f", tp->url.c_str(), tp->duration); + // use stream to parse ts packet. int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE; for (int i = 0; i < nb_packet; i++) { @@ -360,6 +370,12 @@ int SrsIngestSrsInput::parse(ISrsTsHandler* handler) // process each ts packet if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) { + // when peer closed, must interrupt parse and reconnect. + if (srs_is_client_gracefully_close(ret)) { + srs_warn("interrupt parse for peer closed. ret=%d", ret); + return ret; + } + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); continue; } @@ -392,7 +408,7 @@ void SrsIngestSrsInput::dirty_all_ts() } } -void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client) +void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8) { int ret = ERROR_SUCCESS; @@ -410,17 +426,16 @@ void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client) continue; } - if ((ret = tp->fetch(in_hls->get_url(), client)) != ERROR_SUCCESS) { + if ((ret = tp->fetch(in_hls->get_url())) != ERROR_SUCCESS) { srs_warn("ignore ts %s for error. ret=%d", tp->url.c_str(), ret); tp->skip = true; continue; } - // set the next connect time. - if (next_connect_time <= 0) { - next_connect_time = srs_update_system_time_ms(); + // only wait for a duration of last piece. + if (i == pieces.size() - 1) { + next_connect_time = srs_update_system_time_ms() + (int)tp->duration * 1000; } - next_connect_time += (int)tp->duration * 1000; } } @@ -432,6 +447,7 @@ void SrsIngestSrsInput::remove_dirty() SrsTsPiece* tp = *it; if (tp->dirty) { + srs_trace("erase dirty ts, url=%s, duration=%.2f", tp->url.c_str(), tp->duration); srs_freep(tp); it = pieces.erase(it); } else { @@ -440,7 +456,7 @@ void SrsIngestSrsInput::remove_dirty() } } -int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client) +int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8) { int ret = ERROR_SUCCESS; @@ -450,8 +466,7 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client) size_t pos = string::npos; - bool use_abs_client = false; - SrsHttpClient abs_client; + SrsHttpClient client; std::string ts_url = url; if (!srs_string_starts_with(ts_url, "http://")) { @@ -460,10 +475,6 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client) baseurl = m3u8.substr(0, pos); } ts_url = baseurl + "/" + url; - - // use fresh client for absolute url. - client = &abs_client; - use_abs_client = true; } SrsHttpUri uri; @@ -472,12 +483,12 @@ int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client) } // initialize the fresh http client. - if (use_abs_client && (ret = client->initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) { + if ((ret = client.initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) { return ret; } SrsHttpMessage* msg = NULL; - if ((ret = client->get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) { + if ((ret = client.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) { srs_error("HTTP GET %s failed. ret=%d", uri.get_url(), ret); return ret; } @@ -500,6 +511,9 @@ class SrsIngestSrsOutput : public ISrsTsHandler { private: SrsHttpUri* out_rtmp; +private: + bool disconnected; + std::multimap queue; private: SrsRequest* req; st_netfd_t stfd; @@ -519,6 +533,7 @@ private: public: SrsIngestSrsOutput(SrsHttpUri* rtmp) { out_rtmp = rtmp; + disconnected = false; req = NULL; io = NULL; @@ -537,14 +552,22 @@ public: srs_freep(avc); srs_freep(aac); + + std::multimap::iterator it; + for (it = queue.begin(); it != queue.end(); ++it) { + SrsTsMessage* msg = it->second; + srs_freep(msg); + } + queue.clear(); } // interface ISrsTsHandler public: virtual int on_ts_message(SrsTsMessage* msg); private: + virtual int parse_message_queue(); virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs); virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts); - virtual int write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts); + virtual int write_h264_ipb_frame(std::string ibps, SrsCodecVideoAVCFrame frame_type, u_int32_t dts, u_int32_t pts); virtual int on_ts_audio(SrsTsMessage* msg, SrsStream* avs); virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts); private: @@ -554,6 +577,10 @@ public: * connect to output rtmp server. */ virtual int connect(); + /** + * flush the message queue when all ts parsed. + */ + virtual int flush_message_queue(); private: virtual int connect_app(std::string ep_server, std::string ep_port); // close the connected io and rtmp to ready to be re-connect. @@ -601,7 +628,7 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) // 14496-2 video stream number xxxx // ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo - srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)", + srs_info("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)", (msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", srs_ts_stream2string(msg->channel->stream).c_str(), msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number()); @@ -621,22 +648,107 @@ int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) return ret; } - // parse the stream. - SrsStream avs; - if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { - srs_error("mpegts: initialize av stream failed. ret=%d", ret); + // we must use queue to cache the msg, then parse it if possible. + queue.insert(std::make_pair(msg->dts, msg->detach())); + if ((ret = parse_message_queue()) != ERROR_SUCCESS) { + // when peer closed, close the output and reconnect. + if (srs_is_client_gracefully_close(ret)) { + close(); + } return ret; } - // publish audio or video. - if (msg->channel->stream == SrsTsStreamVideoH264) { - return on_ts_video(msg, &avs); - } - if (msg->channel->stream == SrsTsStreamAudioAAC) { - return on_ts_audio(msg, &avs); + return ret; +} + +int SrsIngestSrsOutput::parse_message_queue() +{ + int ret = ERROR_SUCCESS; + + int nb_videos = 0; + int nb_audios = 0; + std::multimap::iterator it; + for (it = queue.begin(); it != queue.end(); ++it) { + SrsTsMessage* msg = it->second; + + // publish audio or video. + if (msg->channel->stream == SrsTsStreamVideoH264) { + nb_videos++; + } else { + nb_audios++; + } + } + + // always wait 2+ videos, to left one video in the queue. + // TODO: FIXME: support pure audio hls. + if (nb_videos <= 1) { + return ret; + } + + // parse messages util the last video. + while (nb_videos > 1 && queue.size() > 0) { + std::multimap::iterator it = queue.begin(); + + SrsTsMessage* msg = it->second; + if (msg->channel->stream == SrsTsStreamVideoH264) { + nb_videos--; + } + queue.erase(it); + + // parse the stream. + SrsStream avs; + if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { + srs_error("mpegts: initialize av stream failed. ret=%d", ret); + return ret; + } + + // publish audio or video. + if (msg->channel->stream == SrsTsStreamVideoH264) { + if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) { + return ret; + } + } + if (msg->channel->stream == SrsTsStreamAudioAAC) { + if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) { + return ret; + } + } + } + + return ret; +} + +int SrsIngestSrsOutput::flush_message_queue() +{ + int ret = ERROR_SUCCESS; + + // parse messages util the last video. + while (!queue.empty()) { + std::multimap::iterator it = queue.begin(); + + SrsTsMessage* msg = it->second; + queue.erase(it); + + // parse the stream. + SrsStream avs; + if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { + srs_error("mpegts: initialize av stream failed. ret=%d", ret); + return ret; + } + + // publish audio or video. + if (msg->channel->stream == SrsTsStreamVideoH264) { + if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) { + return ret; + } + } + if (msg->channel->stream == SrsTsStreamAudioAAC) { + if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) { + return ret; + } + } } - // TODO: FIXME: implements it. return ret; } @@ -644,18 +756,12 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs) { int ret = ERROR_SUCCESS; - // ensure rtmp connected. - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; - } - // ts tbn to flv tbn. u_int32_t dts = (u_int32_t)(msg->dts / 90); u_int32_t pts = (u_int32_t)(msg->dts / 90); - // the whole ts pes video packet must be a flv frame packet. - char* ibpframe = avs->data() + avs->pos(); - int ibpframe_size = avs->size() - avs->pos(); + std::string ibps; + SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame; // send each frame. while (!avs->empty()) { @@ -665,10 +771,18 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs) return ret; } - // ignore invalid frame, - // * atleast 1bytes for SPS to decode the type - // * ignore the auth bytes '09f0' - if (frame_size <= 2) { + // 5bits, 7.3.1 NAL unit syntax, + // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsCodecVideoAVCFrameKeyFrame; + } + + // ignore the nalu type aud(9) + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) { continue; } @@ -684,10 +798,6 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs) } h264_sps_changed = true; h264_sps = sps; - - if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { - return ret; - } continue; } @@ -703,27 +813,45 @@ int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs) } h264_pps_changed = true; h264_pps = pps; - - if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { - return ret; - } continue; } - break; + // ibp frame. + std::string ibp; + if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) { + return ret; + } + ibps.append(ibp); } - // ibp frame. - srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", ibpframe_size, dts); - return write_h264_ipb_frame(ibpframe, ibpframe_size, dts, pts); + if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = write_h264_ipb_frame(ibps, frame_type, dts, pts)) != ERROR_SUCCESS) { + // drop the ts message. + if (ret == ERROR_H264_DROP_BEFORE_SPS_PPS) { + return ERROR_SUCCESS; + } + return ret; + } + + return ret; } int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts) { int ret = ERROR_SUCCESS; - // only send when both sps and pps changed. - if (!h264_sps_changed || !h264_pps_changed) { + // when sps or pps changed, update the sequence header, + // for the pps maybe not changed while sps changed. + // so, we must check when each video ts message frame parsed. + if (h264_sps_pps_sent && !h264_sps_changed && !h264_pps_changed) { + return ret; + } + + // when not got sps/pps, wait. + if (h264_pps.empty() || h264_sps.empty()) { return ret; } @@ -752,11 +880,12 @@ int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts) h264_sps_changed = false; h264_pps_changed = false; h264_sps_pps_sent = true; + srs_trace("hls: h264 sps/pps sent, sps=%dB, pps=%dB", h264_sps.length(), h264_pps.length()); return ret; } -int SrsIngestSrsOutput::write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts) +int SrsIngestSrsOutput::write_h264_ipb_frame(string ibps, SrsCodecVideoAVCFrame frame_type, u_int32_t dts, u_int32_t pts) { int ret = ERROR_SUCCESS; @@ -766,26 +895,10 @@ int SrsIngestSrsOutput::write_h264_ipb_frame(char* frame, int frame_size, u_int3 return ERROR_H264_DROP_BEFORE_SPS_PPS; } - // 5bits, 7.3.1 NAL unit syntax, - // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame - SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); - - // for IDR frame, the frame is keyframe. - SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame; - if (nal_unit_type == SrsAvcNaluTypeIDR) { - frame_type = SrsCodecVideoAVCFrameKeyFrame; - } - - std::string ibp; - if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) { - return ret; - } - int8_t avc_packet_type = SrsCodecVideoAVCTypeNALU; char* flv = NULL; int nb_flv = 0; - if ((ret = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { + if ((ret = avc->mux_avc2flv(ibps, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { return ret; } @@ -798,14 +911,17 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs) { int ret = ERROR_SUCCESS; - // ensure rtmp connected. - if ((ret = connect()) != ERROR_SUCCESS) { - return ret; - } - // ts tbn to flv tbn. u_int32_t dts = (u_int32_t)(msg->dts / 90); + // got the next video to calc the delta duration for each audio. + u_int32_t duration = 0; + if (!queue.empty()) { + SrsTsMessage* nm = queue.begin()->second; + duration = (u_int32_t)(srs_max(0, nm->dts - msg->dts) / 90); + } + u_int32_t max_dts = dts + duration; + // send each frame. while (!avs->empty()) { char* frame = NULL; @@ -842,6 +958,10 @@ int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs) if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) { return ret; } + + // calc the delta of dts, when previous frame output. + u_int32_t delta = duration / (msg->payload->length() / frame_size); + dts = (u_int32_t)(srs_min(max_dts, dts + delta)); } return ret; @@ -890,6 +1010,8 @@ int SrsIngestSrsOutput::connect() return ret; } + srs_trace("connect output=%s", out_rtmp->get_url()); + // parse uri if (!req) { req = new SrsRequest(); @@ -996,6 +1118,9 @@ int SrsIngestSrsOutput::connect_app(string ep_server, string ep_port) void SrsIngestSrsOutput::close() { + srs_trace("close output=%s", out_rtmp->get_url()); + h264_sps_pps_sent = false; + srs_freep(client); srs_freep(io); srs_freep(req); @@ -1035,6 +1160,11 @@ public: return ret; } + if ((ret = oc->flush_message_queue()) != ERROR_SUCCESS) { + srs_warn("flush oc message failed. ret=%d", ret); + return ret; + } + return ret; } };