diff --git a/trunk/research/hls/ts_info.cc b/trunk/research/hls/ts_info.cc index 9c5dbc710..0915001b2 100644 --- a/trunk/research/hls/ts_info.cc +++ b/trunk/research/hls/ts_info.cc @@ -1539,7 +1539,8 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t // for (i = 0; i < PES_packet_length; i++) { // PES_packet_data_byte // } - } else if (stream_id != PES_padding_stream) { + // TODO: FIXME: implements it. + } else if (stream_id == PES_padding_stream) { // for (i = 0; i < PES_packet_length; i++) { // padding_byte // } diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index fc9a9e83b..32a5d2454 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -73,7 +73,8 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) // process each ts packet if ((ret = on_ts_packet(stream)) != ERROR_SUCCESS) { - break; + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); + continue; } srs_info("mpegts: parse ts packet completed"); } @@ -86,7 +87,7 @@ int SrsMpegtsOverUdp::on_ts_packet(SrsStream* stream) { int ret = ERROR_SUCCESS; - if ((ret = context->decode(stream)) != ERROR_SUCCESS) { + if ((ret = context->decode(stream, this)) != ERROR_SUCCESS) { srs_error("mpegts: decode ts packet failed. ret=%d", ret); return ret; } @@ -94,4 +95,11 @@ int SrsMpegtsOverUdp::on_ts_packet(SrsStream* stream) return ret; } +int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index 115e913d3..b30c2886a 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -39,10 +39,12 @@ class SrsConfDirective; #ifdef SRS_AUTO_STREAM_CASTER +#include + /** * the mpegts over udp stream caster. */ -class SrsMpegtsOverUdp +class SrsMpegtsOverUdp : public ISrsTsHandler { private: SrsStream* stream; @@ -67,6 +69,9 @@ private: * the stream contains the ts packet to parse. */ virtual int on_ts_packet(SrsStream* stream); +// interface ISrsTsHandler +public: + virtual int on_ts_message(SrsTsMessage* msg); }; #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 0be810ad7..3cfa87d98 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -226,6 +226,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_STREAM_CASTER_TS_PSI 4016 #define ERROR_STREAM_CASTER_TS_PAT 4017 #define ERROR_STREAM_CASTER_TS_PMT 4018 +#define ERROR_STREAM_CASTER_TS_PSE 4019 /** * whether the error code is an system control error. diff --git a/trunk/src/kernel/srs_kernel_ts.cpp b/trunk/src/kernel/srs_kernel_ts.cpp index 93b3b426d..701d01e3b 100644 --- a/trunk/src/kernel/srs_kernel_ts.cpp +++ b/trunk/src/kernel/srs_kernel_ts.cpp @@ -402,6 +402,90 @@ SrsMpegtsFrame::SrsMpegtsFrame() key = false; } +SrsTsChannel::SrsTsChannel() +{ + pid = 0; + apply = SrsTsPidApplyReserved; + stream = SrsTsStreamReserved; + msg = NULL; +} + +SrsTsChannel::~SrsTsChannel() +{ + srs_freep(msg); +} + +SrsTsMessage::SrsTsMessage() +{ + payload = NULL; + clear(); +} + +SrsTsMessage::~SrsTsMessage() +{ + srs_freep(payload); +} + +int SrsTsMessage::dump(SrsStream* stream, int* pnb_bytes) +{ + int ret = ERROR_SUCCESS; + + if (stream->empty()) { + return ret; + } + + // xB + int nb_bytes = stream->size() - stream->pos(); + if (PES_packet_length > 0) { + nb_bytes = srs_min(nb_bytes, PES_packet_length - payload->length()); + } + + if (nb_bytes > 0) { + if (!stream->require(nb_bytes)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: dump PSE bytes failed, requires=%dB. ret=%d", nb_bytes, ret); + return ret; + } + + payload->append(stream->data() + stream->pos(), nb_bytes); + stream->skip(nb_bytes); + } + + *pnb_bytes = nb_bytes; + + return ret; +} + +bool SrsTsMessage::completed(int8_t payload_unit_start_indicator) +{ + if (PES_packet_length == 0) { + return payload_unit_start_indicator; + } + return payload->length() >= PES_packet_length; +} + +bool SrsTsMessage::fresh() +{ + return payload->length() == 0; +} + +void SrsTsMessage::clear() +{ + dts = pts = 0; + continuity_counter = 0; + PES_packet_length = 0; + srs_freep(payload); + payload = new SrsSimpleBuffer(); +} + +ISrsTsHandler::ISrsTsHandler() +{ +} + +ISrsTsHandler::~ISrsTsHandler() +{ +} + SrsTsContext::SrsTsContext() { } @@ -416,7 +500,7 @@ SrsTsContext::~SrsTsContext() pids.clear(); } -int SrsTsContext::decode(SrsStream* stream) +int SrsTsContext::decode(SrsStream* stream, ISrsTsHandler* handler) { int ret = ERROR_SUCCESS; @@ -426,10 +510,21 @@ int SrsTsContext::decode(SrsStream* stream) SrsTsPacket* packet = new SrsTsPacket(this); SrsAutoFree(SrsTsPacket, packet); - if ((ret = packet->decode(stream)) != ERROR_SUCCESS) { + SrsTsMessage* msg = NULL; + if ((ret = packet->decode(stream, &msg)) != ERROR_SUCCESS) { srs_error("mpegts: decode ts packet failed. ret=%d", ret); return ret; } + + if (!msg) { + continue; + } + SrsAutoFree(SrsTsMessage, msg); + + if ((ret = handler->on_ts_message(msg)) != ERROR_SUCCESS) { + srs_error("mpegts: handler ts message failed. ret=%d", ret); + return ret; + } } return ret; @@ -481,7 +576,7 @@ SrsTsPacket::~SrsTsPacket() srs_freep(payload); } -int SrsTsPacket::decode(SrsStream* stream) +int SrsTsPacket::decode(SrsStream* stream, SrsTsMessage** ppmsg) { int ret = ERROR_SUCCESS; @@ -555,7 +650,7 @@ int SrsTsPacket::decode(SrsStream* stream) } } - if (payload && (ret = payload->decode(stream)) != ERROR_SUCCESS) { + if (payload && (ret = payload->decode(stream, ppmsg)) != ERROR_SUCCESS) { srs_error("ts: demux payload failed. ret=%d", ret); return ret; } @@ -821,7 +916,6 @@ SrsTsPayloadPES::SrsTsPayloadPES(SrsTsPacket* p) : SrsTsPayload(p) PES_extension_field = NULL; nb_stuffings = 0; nb_bytes = 0; - bytes = NULL; nb_paddings = 0; } @@ -830,12 +924,463 @@ SrsTsPayloadPES::~SrsTsPayloadPES() srs_freep(PES_private_data); srs_freep(pack_field); srs_freep(PES_extension_field); - srs_freep(bytes); } -int SrsTsPayloadPES::decode(SrsStream* stream) +int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg) { int ret = ERROR_SUCCESS; + + // find the channel from chunk. + SrsTsChannel* channel = packet->context->get(packet->pid); + if (!channel) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PES no channel for pid=%#x. ret=%d", packet->pid, ret); + return ret; + } + + // init msg. + SrsTsMessage* msg = channel->msg; + if (!msg) { + msg = new SrsTsMessage(); + channel->msg = msg; + } + + // check when fresh, the payload_unit_start_indicator + // should be 1 for the fresh msg. + if (msg->fresh() && !packet->payload_unit_start_indicator) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: PES fresh packet length=%d, unit_start=%d. ret=%d", + msg->PES_packet_length, packet->payload_unit_start_indicator, ret); + return ret; + } + + // check when not fresh and PES_packet_length>0, + // the payload_unit_start_indicator should never be 1 when not completed. + if (!msg->fresh() && msg->PES_packet_length > 0 + && packet->payload_unit_start_indicator + && !msg->completed(packet->payload_unit_start_indicator) + ) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: PES packet length=%d, payload=%d, unit_start=%d. ret=%d", + msg->PES_packet_length, msg->payload->length(), + packet->payload_unit_start_indicator, ret); + + // reparse current msg. + stream->skip(stream->pos() * -1); + msg->clear(); + return ERROR_SUCCESS; + } + + // check the continuity counter + if (!msg->fresh()) { + // late-incoming or duplicated continuity, drop message. + // @remark check overflow, the counter plus 1 should greater when invalid. + if (msg->continuity_counter >= packet->continuity_counter + && ((msg->continuity_counter + 1) & 0x0f) > packet->continuity_counter + ) { + srs_warn("ts: drop PES %dB for duplicated continuity=%#x", msg->continuity_counter); + stream->skip(stream->size() - stream->pos()); + return ret; + } + + // when got partially message, the continous count must be continuous, or drop it. + if (((msg->continuity_counter + 1) & 0x0f) != packet->continuity_counter) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: continuity must be continous, msg=%#x, packet=%#x. ret=%d", + msg->continuity_counter, packet->continuity_counter, ret); + + // reparse current msg. + stream->skip(stream->pos() * -1); + msg->clear(); + return ERROR_SUCCESS; + } + } + msg->continuity_counter = packet->continuity_counter; + + // for the PES_packet_length(0), reap when completed. + if (!msg->fresh() && msg->completed(packet->payload_unit_start_indicator)) { + // reap previous PES packet. + *ppmsg = msg; + channel->msg = NULL; + + // reparse current msg. + stream->skip(stream->pos() * -1); + + return ret; + } + + // contious packet, append bytes for unit start is 0 + if (!packet->payload_unit_start_indicator) { + if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) { + return ret; + } + } + + // when unit start, parse the fresh msg. + if (packet->payload_unit_start_indicator) { + // 6B fixed header. + if (!stream->require(6)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE failed. ret=%d", ret); + return ret; + } + // 3B + packet_start_code_prefix = stream->read_3bytes(); + // 1B + stream_id = stream->read_1bytes(); + // 2B + PES_packet_length = stream->read_2bytes(); + + // check the packet start prefix. + packet_start_code_prefix &= 0xFFFFFF; + if (packet_start_code_prefix != 0x01) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret); + return ret; + } + int pos_packet = stream->pos(); + + // @remark SrsTsPESStreamIdAudio and SrsTsPESStreamIdVideo is not used here. + SrsTsPESStreamId sid = (SrsTsPESStreamId)stream_id; + + if (sid != SrsTsPESStreamIdProgramStreamMap + && sid != SrsTsPESStreamIdPaddingStream + && sid != SrsTsPESStreamIdPrivateStream2 + && sid != SrsTsPESStreamIdEcmStream + && sid != SrsTsPESStreamIdEmmStream + && sid != SrsTsPESStreamIdProgramStreamDirectory + && sid != SrsTsPESStreamIdDsmccStream + && sid != SrsTsPESStreamIdH2221TypeE + ) { + // 3B flags. + if (!stream->require(3)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE flags failed. ret=%d", ret); + return ret; + } + // 1B + original_or_copy = stream->read_1bytes(); + // 1B + PES_extension_flag = stream->read_1bytes(); + // 1B + PES_header_data_length = stream->read_1bytes(); + // position of header start. + int pos_header = stream->pos(); + + //int8_t const2bits = (original_or_copy >> 6) & 0x03; + PES_scrambling_control = (original_or_copy >> 4) & 0x03; + PES_priority = (original_or_copy >> 3) & 0x01; + data_alignment_indicator = (original_or_copy >> 2) & 0x01; + copyright = (original_or_copy >> 1) & 0x01; + original_or_copy &= 0x01; + + PTS_DTS_flags = (PES_extension_flag >> 6) & 0x03; + ESCR_flag = (PES_extension_flag >> 5) & 0x01; + ES_rate_flag = (PES_extension_flag >> 4) & 0x01; + DSM_trick_mode_flag = (PES_extension_flag >> 3) & 0x01; + additional_copy_info_flag = (PES_extension_flag >> 2) & 0x01; + PES_CRC_flag = (PES_extension_flag >> 1) & 0x01; + PES_extension_flag &= 0x01; + + // check required together. + int nb_required = 0; + nb_required += (PTS_DTS_flags == 0x2)? 5:0; + nb_required += (PTS_DTS_flags == 0x3)? 10:0; + nb_required += ESCR_flag? 6:0; + nb_required += ES_rate_flag? 3:0; + nb_required += DSM_trick_mode_flag? 1:0; + nb_required += additional_copy_info_flag? 1:0; + nb_required += PES_CRC_flag? 2:0; + nb_required += PES_extension_flag? 1:0; + if (!stream->require(nb_required)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE payload failed. ret=%d", ret); + return ret; + } + + // 5B + if (PTS_DTS_flags == 0x2) { + if ((ret = decode_33bits_dts_pts(stream, &pts)) != ERROR_SUCCESS) { + return ret; + } + dts = pts; + + // update the dts and pts of message. + msg->dts = dts; + msg->pts = pts; + } + + // 10B + if (PTS_DTS_flags == 0x3) { + if ((ret = decode_33bits_dts_pts(stream, &pts)) != ERROR_SUCCESS) { + return ret; + } + if ((ret = decode_33bits_dts_pts(stream, &dts)) != ERROR_SUCCESS) { + return ret; + } + + // check sync, the diff of dts and pts should never greater than 1s. + if (dts - pts > 90000 || pts - dts > 90000) { + srs_warn("ts: sync dts=%"PRId64", pts=%"PRId64, dts, pts); + } + + // update the dts and pts of message. + msg->dts = dts; + msg->pts = pts; + } + + // 6B + if (ESCR_flag) { + ESCR_extension = 0; + ESCR_base = 0; + + stream->skip(6); + srs_warn("ts: demux PES, ignore the escr."); + } + + // 3B + if (ES_rate_flag) { + ES_rate = stream->read_3bytes(); + + ES_rate = ES_rate >> 1; + ES_rate &= 0x3FFFFF; + } + + // 1B + if (DSM_trick_mode_flag) { + trick_mode_control = stream->read_1bytes(); + + trick_mode_value = trick_mode_control & 0x1f; + trick_mode_control = (trick_mode_control >> 5) & 0x03; + } + + // 1B + if (additional_copy_info_flag) { + additional_copy_info = stream->read_1bytes(); + + additional_copy_info &= 0x7f; + } + + // 2B + if (PES_CRC_flag) { + previous_PES_packet_CRC = stream->read_2bytes(); + } + + // 1B + if (PES_extension_flag) { + PES_extension_flag_2 = stream->read_1bytes(); + + PES_private_data_flag = (PES_extension_flag_2 >> 7) & 0x01; + pack_header_field_flag = (PES_extension_flag_2 >> 6) & 0x01; + program_packet_sequence_counter_flag = (PES_extension_flag_2 >> 5) & 0x01; + P_STD_buffer_flag = (PES_extension_flag_2 >> 4) & 0x01; + PES_extension_flag_2 &= PES_extension_flag_2 & 0x01; + + nb_required = 0; + nb_required += PES_private_data_flag? 16:0; + nb_required += pack_header_field_flag? 1:0; // 1+x bytes. + nb_required += program_packet_sequence_counter_flag? 2:0; + nb_required += P_STD_buffer_flag? 2:0; + nb_required += PES_extension_flag_2? 1:0; // 1+x bytes. + if (!stream->require(nb_required)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE ext payload failed. ret=%d", ret); + return ret; + } + + // 16B + if (PES_private_data_flag) { + srs_freep(PES_private_data); + PES_private_data = new char[16]; + stream->read_bytes(PES_private_data, 16); + } + + // (1+x)B + if (pack_header_field_flag) { + pack_field_length = stream->read_1bytes(); + if (pack_field_length > 0) { + // the adjust required bytes. + nb_required = nb_required - 16 - 1 + pack_field_length; + if (!stream->require(nb_required)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE ext pack failed. ret=%d", ret); + return ret; + } + srs_freep(pack_field); + pack_field = new char[pack_field_length]; + stream->read_bytes(pack_field, pack_field_length); + } + } + + // 2B + if (program_packet_sequence_counter_flag) { + program_packet_sequence_counter = stream->read_1bytes(); + program_packet_sequence_counter &= 0x7f; + + original_stuff_length = stream->read_1bytes(); + MPEG1_MPEG2_identifier = (original_stuff_length >> 6) & 0x01; + original_stuff_length &= 0x3f; + } + + // 2B + if (P_STD_buffer_flag) { + P_STD_buffer_size = stream->read_2bytes(); + + // '01' + //int8_t const2bits = (P_STD_buffer_scale >>14) & 0x03; + + P_STD_buffer_scale = (P_STD_buffer_scale >>13) & 0x01; + P_STD_buffer_size &= 0x1FFF; + } + + // (1+x)B + if (PES_extension_flag_2) { + PES_extension_field_length = stream->read_1bytes(); + PES_extension_field_length &= 0x07; + + if (PES_extension_field_length > 0) { + if (!stream->require(PES_extension_field_length)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE ext field failed. ret=%d", ret); + return ret; + } + srs_freep(PES_extension_field); + PES_extension_field = new char[PES_extension_field_length]; + stream->read_bytes(PES_extension_field, PES_extension_field_length); + } + } + } + + // stuffing_byte + nb_stuffings = PES_header_data_length - (stream->pos() - pos_header); + if (nb_stuffings > 0) { + if (!stream->require(nb_stuffings)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE stuffings failed. ret=%d", ret); + return ret; + } + stream->skip(nb_stuffings); + } + + // PES_packet_data_byte, page58. + // the packet size contains the header size. + // The number of PES_packet_data_bytes, N, is specified by the + // PES_packet_length field. N shall be equal to the value + // indicated in the PES_packet_length minus the number of bytes + // between the last byte of the PES_packet_length field and the + // first PES_packet_data_byte. + /** + * when actual packet length > 0xffff(65535), + * which exceed the max u_int16_t packet length, + * use 0 packet length, the next unit start indicates the end of packet. + */ + if (PES_packet_length > 0) { + int nb_packet = PES_packet_length - (stream->pos() - pos_packet); + msg->PES_packet_length = srs_max(0, nb_packet); + } + + // xB + if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) { + return ret; + } + } else if (sid == SrsTsPESStreamIdProgramStreamMap + || sid == SrsTsPESStreamIdPrivateStream2 + || sid == SrsTsPESStreamIdEcmStream + || sid == SrsTsPESStreamIdEmmStream + || sid == SrsTsPESStreamIdProgramStreamDirectory + || sid == SrsTsPESStreamIdDsmccStream + || sid == SrsTsPESStreamIdH2221TypeE + ) { + // for (i = 0; i < PES_packet_length; i++) { + // PES_packet_data_byte + // } + + // xB + if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) { + return ret; + } + } else if (sid == SrsTsPESStreamIdPaddingStream) { + // for (i = 0; i < PES_packet_length; i++) { + // padding_byte + // } + nb_paddings = stream->size() - stream->pos(); + stream->skip(nb_paddings); + srs_info("ts: drop %dB padding bytes", nb_paddings); + } else { + int nb_drop = stream->size() - stream->pos(); + stream->skip(nb_drop); + srs_warn("ts: drop the pes packet %dB for stream_id=%#x", nb_drop, stream_id); + } + } + + // check msg, reap when completed. + if (msg->completed(packet->payload_unit_start_indicator)) { + *ppmsg = msg; + channel->msg = NULL; + srs_info("ts: reap msg for completed."); + } + + return ret; +} + +int SrsTsPayloadPES::decode_33bits_dts_pts(SrsStream* stream, int64_t* pv) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(5)) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE dts/pts failed. ret=%d", ret); + return ret; + } + + // decode the 33bits schema. + // ===========1B + // 4bits const maybe '0001', '0010' or '0011'. + // 3bits DTS/PTS [32..30] + // 1bit const '1' + int64_t dts_pts_30_32 = stream->read_1bytes(); + if ((dts_pts_30_32 & 0x01) != 0x01) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE dts/pts 30-32 failed. ret=%d", ret); + return ret; + } + // @remark, we donot check the high 4bits, maybe '0001', '0010' or '0011'. + // so we just ensure the high 4bits is not 0x00. + if (((dts_pts_30_32 >> 4) & 0x0f) == 0x00) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE dts/pts 30-32 failed. ret=%d", ret); + return ret; + } + dts_pts_30_32 = (dts_pts_30_32 >> 1) & 0x07; + + // ===========2B + // 15bits DTS/PTS [29..15] + // 1bit const '1' + int64_t dts_pts_15_29 = stream->read_2bytes(); + if ((dts_pts_15_29 & 0x01) != 0x01) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE dts/pts 15-29 failed. ret=%d", ret); + return ret; + } + dts_pts_15_29 = (dts_pts_15_29 >> 1) & 0x7fff; + + // ===========2B + // 15bits DTS/PTS [14..0] + // 1bit const '1' + int64_t dts_pts_0_14 = stream->read_2bytes(); + if ((dts_pts_0_14 & 0x01) != 0x01) { + ret = ERROR_STREAM_CASTER_TS_PSE; + srs_error("ts: demux PSE dts/pts 0-14 failed. ret=%d", ret); + return ret; + } + dts_pts_0_14 = (dts_pts_0_14 >> 1) & 0x7fff; + + int64_t v = 0x00; + v |= (dts_pts_30_32 << 30) & 0x1c0000000LL; + v |= (dts_pts_15_29 << 15) & 0x3fff8000LL; + v |= dts_pts_0_14 & 0x7fff; + *pv = v; + return ret; } @@ -849,7 +1394,7 @@ SrsTsPayloadPSI::~SrsTsPayloadPSI() { } -int SrsTsPayloadPSI::decode(SrsStream* stream) +int SrsTsPayloadPSI::decode(SrsStream* stream, SrsTsMessage** /*ppmsg*/) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/kernel/srs_kernel_ts.hpp b/trunk/src/kernel/srs_kernel_ts.hpp index 0ff626b05..2bc1e4dd5 100644 --- a/trunk/src/kernel/srs_kernel_ts.hpp +++ b/trunk/src/kernel/srs_kernel_ts.hpp @@ -45,6 +45,7 @@ class SrsCodecSample; class SrsSimpleBuffer; class SrsTsAdaptationField; class SrsTsPayload; +class SrsTsMessage; // Transport Stream packets are 188 bytes in length. #define SRS_TS_PACKET_SIZE 188 @@ -180,6 +181,65 @@ struct SrsTsChannel int pid; SrsTsPidApply apply; SrsTsStream stream; + SrsTsMessage* msg; + + SrsTsChannel(); + virtual ~SrsTsChannel(); +}; + +/** +* the media audio/video message parsed from PES packet. +*/ +class SrsTsMessage +{ +public: + int64_t dts; + int64_t pts; + u_int16_t PES_packet_length; + u_int8_t continuity_counter; + SrsSimpleBuffer* payload; +public: + SrsTsMessage(); + virtual ~SrsTsMessage(); +public: + /** + * dumps all bytes in stream to ts message. + */ + virtual int dump(SrsStream* stream, int* pnb_bytes); + /** + * whether ts message is completed to reap. + * @param payload_unit_start_indicator whether new ts message start. + * PES_packet_length is 0, the payload_unit_start_indicator=1 to reap ts message. + * PES_packet_length > 0, the payload.length() == PES_packet_length to reap ts message. + * @remark when PES_packet_length>0, the payload_unit_start_indicator should never be 1 when not completed. + * @remark when fresh, the payload_unit_start_indicator should be 1. + */ + virtual bool completed(int8_t payload_unit_start_indicator); + /** + * whether the message is fresh. + */ + virtual bool fresh(); + /** + * clear current message. + */ + virtual void clear(); +}; + +/** +* the ts message handler. +*/ +class ISrsTsHandler +{ +public: + ISrsTsHandler(); + virtual ~ISrsTsHandler(); +public: + /** + * when ts context got message, use handler to process it. + * @param msg the ts msg, user should never free it. + * @return an int error code. + */ + virtual int on_ts_message(SrsTsMessage* msg) = 0; }; /** @@ -195,9 +255,10 @@ public: public: /** * the stream contains only one ts packet. + * @param handler the ts message handler to process the msg. * @remark we will consume all bytes in stream. */ - virtual int decode(SrsStream* stream); + virtual int decode(SrsStream* stream, ISrsTsHandler* handler); public: /** * get the pid apply, the parsed pid. @@ -237,7 +298,7 @@ public: * Transport Stream packets that carry PES packets (refer to 2.4.3.6) or PSI data (refer to 2.4.4). * * When the payload of the Transport Stream packet contains PES packet data, the payload_unit_start_indicator has the - * following significance: a '1' indicates that the payload of this Transport Stream packet will commence with the first byte + * following significance: a '1' indicates that the payload of this Transport Stream packet will commence(start) with the first byte * of a PES packet and a '0' indicates no PES packet shall start in this Transport Stream packet. If the * payload_unit_start_indicator is set to '1', then one and only one PES packet starts in this Transport Stream packet. This * also applies to private streams of stream_type 6 (refer to Table 2-29). @@ -293,7 +354,7 @@ public: * * In Transport Streams, duplicate packets may be sent as two, and only two, consecutive Transport Stream packets of the * same PID. The duplicate packets shall have the same continuity_counter value as the original packet and the - * adaptation_field_control field shall be equal to '01' or '11'. In duplicate packets each byte of the original packet shall be + * adaptation_field_control field shall be equal to '01'(payload only) or '11'(both). In duplicate packets each byte of the original packet shall be * duplicated, with the exception that in the program clock reference fields, if present, a valid value shall be encoded. * * The continuity_counter in a particular Transport Stream packet is continuous when it differs by a positive value of one @@ -312,7 +373,7 @@ public: SrsTsPacket(SrsTsContext* c); virtual ~SrsTsPacket(); public: - virtual int decode(SrsStream* stream); + virtual int decode(SrsStream* stream, SrsTsMessage** ppmsg); }; /** @@ -658,7 +719,64 @@ public: SrsTsPayload(SrsTsPacket* p); virtual ~SrsTsPayload(); public: - virtual int decode(SrsStream* stream) = 0; + virtual int decode(SrsStream* stream, SrsTsMessage** ppmsg) = 0; +}; + +/** +* the stream_id of PES payload of ts packet. +* Table 2-18 – Stream_id assignments, hls-mpeg-ts-iso13818-1.pdf, page 52. +*/ +enum SrsTsPESStreamId +{ + // program_stream_map + SrsTsPESStreamIdProgramStreamMap = 0xbc, // 0b10111100 + // private_stream_1 + SrsTsPESStreamIdPrivateStream1 = 0xbd, // 0b10111101 + // padding_stream + SrsTsPESStreamIdPaddingStream = 0xbe, // 0b10111110 + // private_stream_2 + SrsTsPESStreamIdPrivateStream2 = 0xbf, // 0b10111111 + + // 110x xxxx + // ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC + // 14496-3 audio stream number x xxxx + // (stream_id>>5)&0x07 == SrsTsPESStreamIdAudio + SrsTsPESStreamIdAudio = 0x06, // 0b110 + + // 1110 xxxx + // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC + // 14496-2 video stream number xxxx + // (stream_id>>4)&0x0f == SrsTsPESStreamIdVideo + SrsTsPESStreamIdVideo = 0x0e, // 0b1110 + + // ECM_stream + SrsTsPESStreamIdEcmStream = 0xf0, // 0b11110000 + // EMM_stream + SrsTsPESStreamIdEmmStream = 0xf1, // 0b11110001 + // DSMCC_stream + SrsTsPESStreamIdDsmccStream = 0xf2, // 0b11110010 + // 13522_stream + SrsTsPESStreamId13522Stream = 0xf3, // 0b11110011 + // H_222_1_type_A + SrsTsPESStreamIdH2221TypeA = 0xf4, // 0b11110100 + // H_222_1_type_B + SrsTsPESStreamIdH2221TypeB = 0xf5, // 0b11110101 + // H_222_1_type_C + SrsTsPESStreamIdH2221TypeC = 0xf6, // 0b11110110 + // H_222_1_type_D + SrsTsPESStreamIdH2221TypeD = 0xf7, // 0b11110111 + // H_222_1_type_E + SrsTsPESStreamIdH2221TypeE = 0xf8, // 0b11111000 + // ancillary_stream + SrsTsPESStreamIdAncillaryStream = 0xf9, // 0b11111001 + // SL_packetized_stream + SrsTsPESStreamIdSlPacketizedStream = 0xfa, // 0b11111010 + // FlexMux_stream + SrsTsPESStreamIdFlexMuxStream = 0xfb, // 0b11111011 + // reserved data stream + // 1111 1100 … 1111 1110 + // program_stream_directory + SrsTsPESStreamIdProgramStreamDirectory = 0xff, // 0b11111111 }; /** @@ -774,7 +892,7 @@ public: */ u_int8_t PES_header_data_length; //8bits - // 8B + // 5B /** * Presentation times shall be related to decoding times as follows: The PTS is a 33-bit * number coded in three separate fields. It indicates the time of presentation, tp n (k), in the system target decoder of a @@ -782,15 +900,37 @@ public: * frequency divided by 300 (yielding 90 kHz). The presentation time is derived from the PTS according to equation 2-11 * below. Refer to 2.7.4 for constraints on the frequency of coding presentation timestamps. */ + // ===========1B + // 4bits const + // 3bits PTS [32..30] + // 1bit const '1' + // ===========2B + // 15bits PTS [29..15] + // 1bit const '1' + // ===========2B + // 15bits PTS [14..0] + // 1bit const '1' int64_t pts; // 33bits + + // 5B /** * The DTS is a 33-bit number coded in three separate fields. It indicates the decoding time, * td n (j), in the system target decoder of an access unit j of elementary stream n. The value of DTS is specified in units of * the period of the system clock frequency divided by 300 (yielding 90 kHz). */ + // ===========1B + // 4bits const + // 3bits DTS [32..30] + // 1bit const '1' + // ===========2B + // 15bits DTS [29..15] + // 1bit const '1' + // ===========2B + // 15bits DTS [14..0] + // 1bit const '1' int64_t dts; // 33bits - // 8B + // 6B /** * The elementary stream clock reference is a 42-bit field coded in two parts. The first * part, ESCR_base, is a 33-bit field whose value is given by ESCR_base(i), as given in equation 2-14. The second part, @@ -798,8 +938,19 @@ public: * intended time of arrival of the byte containing the last bit of the ESCR_base at the input of the PES-STD for PES streams * (refer to 2.5.2.4). */ - int16_t ESCR_extension; //9bits + // 2bits reserved + // 3bits ESCR_base[32..30] + // 1bit const '1' + // 15bits ESCR_base[29..15] + // 1bit const '1' + // 15bits ESCR_base[14..0] + // 1bit const '1' + // 9bits ESCR_extension + // 1bit const '1' int64_t ESCR_base; //33bits + int16_t ESCR_extension; //9bits + + // 3B /** * The ES_rate field is a 22-bit unsigned integer specifying the rate at which the * system target decoder receives bytes of the PES packet in the case of a PES stream. The ES_rate is valid in the PES @@ -808,6 +959,9 @@ public: * ES_rate is used to define the time of arrival of bytes at the input of a P-STD for PES streams defined in 2.5.2.4. The * value encoded in the ES_rate field may vary from PES_packet to PES_packet. */ + // 1bit const '1' + // 22bits ES_rate + // 1bit const '1' int32_t ES_rate; //22bits // 1B @@ -877,7 +1031,7 @@ public: /** * This is an 8-bit field which indicates the length, in bytes, of the pack_header_field(). */ - int8_t pack_field_length; //8bits + u_int8_t pack_field_length; //8bits char* pack_field; //[pack_field_length] bytes // 2B @@ -928,7 +1082,7 @@ public: * This is a 7-bit field which specifies the length, in bytes, of the data following this field in * the PES extension field up to and including any reserved bytes. */ - int8_t PES_extension_field_length; //7bits + u_int8_t PES_extension_field_length; //7bits char* PES_extension_field; //[PES_extension_field_length] bytes // NB @@ -953,7 +1107,6 @@ public: * PES_packet_data_byte field are user definable and will not be specified by ITU-T | ISO/IEC in the future. */ int nb_bytes; - char* bytes; // NB /** @@ -964,7 +1117,9 @@ public: SrsTsPayloadPES(SrsTsPacket* p); virtual ~SrsTsPayloadPES(); public: - virtual int decode(SrsStream* stream); + virtual int decode(SrsStream* stream, SrsTsMessage** ppmsg); +private: + virtual int decode_33bits_dts_pts(SrsStream* stream, int64_t* pv); }; /** @@ -1020,7 +1175,7 @@ public: SrsTsPayloadPSI(SrsTsPacket* p); virtual ~SrsTsPayloadPSI(); public: - virtual int decode(SrsStream* stream); + virtual int decode(SrsStream* stream, SrsTsMessage** ppmsg); protected: virtual int psi_decode(SrsStream* stream) = 0; };