1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #250, parse pes packet ok, ts message ok.

This commit is contained in:
winlin 2015-01-29 22:58:02 +08:00
parent 486277089d
commit 1685cdd48e
6 changed files with 740 additions and 25 deletions

View file

@ -402,6 +402,90 @@ SrsMpegtsFrame::SrsMpegtsFrame()
key = false;
}
SrsTsChannel::SrsTsChannel()
{
pid = 0;
apply = SrsTsPidApplyReserved;
stream = SrsTsStreamReserved;
msg = NULL;
}
SrsTsChannel::~SrsTsChannel()
{
srs_freep(msg);
}
SrsTsMessage::SrsTsMessage()
{
payload = NULL;
clear();
}
SrsTsMessage::~SrsTsMessage()
{
srs_freep(payload);
}
int SrsTsMessage::dump(SrsStream* stream, int* pnb_bytes)
{
int ret = ERROR_SUCCESS;
if (stream->empty()) {
return ret;
}
// xB
int nb_bytes = stream->size() - stream->pos();
if (PES_packet_length > 0) {
nb_bytes = srs_min(nb_bytes, PES_packet_length - payload->length());
}
if (nb_bytes > 0) {
if (!stream->require(nb_bytes)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: dump PSE bytes failed, requires=%dB. ret=%d", nb_bytes, ret);
return ret;
}
payload->append(stream->data() + stream->pos(), nb_bytes);
stream->skip(nb_bytes);
}
*pnb_bytes = nb_bytes;
return ret;
}
bool SrsTsMessage::completed(int8_t payload_unit_start_indicator)
{
if (PES_packet_length == 0) {
return payload_unit_start_indicator;
}
return payload->length() >= PES_packet_length;
}
bool SrsTsMessage::fresh()
{
return payload->length() == 0;
}
void SrsTsMessage::clear()
{
dts = pts = 0;
continuity_counter = 0;
PES_packet_length = 0;
srs_freep(payload);
payload = new SrsSimpleBuffer();
}
ISrsTsHandler::ISrsTsHandler()
{
}
ISrsTsHandler::~ISrsTsHandler()
{
}
SrsTsContext::SrsTsContext()
{
}
@ -416,7 +500,7 @@ SrsTsContext::~SrsTsContext()
pids.clear();
}
int SrsTsContext::decode(SrsStream* stream)
int SrsTsContext::decode(SrsStream* stream, ISrsTsHandler* handler)
{
int ret = ERROR_SUCCESS;
@ -426,10 +510,21 @@ int SrsTsContext::decode(SrsStream* stream)
SrsTsPacket* packet = new SrsTsPacket(this);
SrsAutoFree(SrsTsPacket, packet);
if ((ret = packet->decode(stream)) != ERROR_SUCCESS) {
SrsTsMessage* msg = NULL;
if ((ret = packet->decode(stream, &msg)) != ERROR_SUCCESS) {
srs_error("mpegts: decode ts packet failed. ret=%d", ret);
return ret;
}
if (!msg) {
continue;
}
SrsAutoFree(SrsTsMessage, msg);
if ((ret = handler->on_ts_message(msg)) != ERROR_SUCCESS) {
srs_error("mpegts: handler ts message failed. ret=%d", ret);
return ret;
}
}
return ret;
@ -481,7 +576,7 @@ SrsTsPacket::~SrsTsPacket()
srs_freep(payload);
}
int SrsTsPacket::decode(SrsStream* stream)
int SrsTsPacket::decode(SrsStream* stream, SrsTsMessage** ppmsg)
{
int ret = ERROR_SUCCESS;
@ -555,7 +650,7 @@ int SrsTsPacket::decode(SrsStream* stream)
}
}
if (payload && (ret = payload->decode(stream)) != ERROR_SUCCESS) {
if (payload && (ret = payload->decode(stream, ppmsg)) != ERROR_SUCCESS) {
srs_error("ts: demux payload failed. ret=%d", ret);
return ret;
}
@ -821,7 +916,6 @@ SrsTsPayloadPES::SrsTsPayloadPES(SrsTsPacket* p) : SrsTsPayload(p)
PES_extension_field = NULL;
nb_stuffings = 0;
nb_bytes = 0;
bytes = NULL;
nb_paddings = 0;
}
@ -830,12 +924,463 @@ SrsTsPayloadPES::~SrsTsPayloadPES()
srs_freep(PES_private_data);
srs_freep(pack_field);
srs_freep(PES_extension_field);
srs_freep(bytes);
}
int SrsTsPayloadPES::decode(SrsStream* stream)
int SrsTsPayloadPES::decode(SrsStream* stream, SrsTsMessage** ppmsg)
{
int ret = ERROR_SUCCESS;
// find the channel from chunk.
SrsTsChannel* channel = packet->context->get(packet->pid);
if (!channel) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PES no channel for pid=%#x. ret=%d", packet->pid, ret);
return ret;
}
// init msg.
SrsTsMessage* msg = channel->msg;
if (!msg) {
msg = new SrsTsMessage();
channel->msg = msg;
}
// check when fresh, the payload_unit_start_indicator
// should be 1 for the fresh msg.
if (msg->fresh() && !packet->payload_unit_start_indicator) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: PES fresh packet length=%d, unit_start=%d. ret=%d",
msg->PES_packet_length, packet->payload_unit_start_indicator, ret);
return ret;
}
// check when not fresh and PES_packet_length>0,
// the payload_unit_start_indicator should never be 1 when not completed.
if (!msg->fresh() && msg->PES_packet_length > 0
&& packet->payload_unit_start_indicator
&& !msg->completed(packet->payload_unit_start_indicator)
) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: PES packet length=%d, payload=%d, unit_start=%d. ret=%d",
msg->PES_packet_length, msg->payload->length(),
packet->payload_unit_start_indicator, ret);
// reparse current msg.
stream->skip(stream->pos() * -1);
msg->clear();
return ERROR_SUCCESS;
}
// check the continuity counter
if (!msg->fresh()) {
// late-incoming or duplicated continuity, drop message.
// @remark check overflow, the counter plus 1 should greater when invalid.
if (msg->continuity_counter >= packet->continuity_counter
&& ((msg->continuity_counter + 1) & 0x0f) > packet->continuity_counter
) {
srs_warn("ts: drop PES %dB for duplicated continuity=%#x", msg->continuity_counter);
stream->skip(stream->size() - stream->pos());
return ret;
}
// when got partially message, the continous count must be continuous, or drop it.
if (((msg->continuity_counter + 1) & 0x0f) != packet->continuity_counter) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: continuity must be continous, msg=%#x, packet=%#x. ret=%d",
msg->continuity_counter, packet->continuity_counter, ret);
// reparse current msg.
stream->skip(stream->pos() * -1);
msg->clear();
return ERROR_SUCCESS;
}
}
msg->continuity_counter = packet->continuity_counter;
// for the PES_packet_length(0), reap when completed.
if (!msg->fresh() && msg->completed(packet->payload_unit_start_indicator)) {
// reap previous PES packet.
*ppmsg = msg;
channel->msg = NULL;
// reparse current msg.
stream->skip(stream->pos() * -1);
return ret;
}
// contious packet, append bytes for unit start is 0
if (!packet->payload_unit_start_indicator) {
if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) {
return ret;
}
}
// when unit start, parse the fresh msg.
if (packet->payload_unit_start_indicator) {
// 6B fixed header.
if (!stream->require(6)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE failed. ret=%d", ret);
return ret;
}
// 3B
packet_start_code_prefix = stream->read_3bytes();
// 1B
stream_id = stream->read_1bytes();
// 2B
PES_packet_length = stream->read_2bytes();
// check the packet start prefix.
packet_start_code_prefix &= 0xFFFFFF;
if (packet_start_code_prefix != 0x01) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret);
return ret;
}
int pos_packet = stream->pos();
// @remark SrsTsPESStreamIdAudio and SrsTsPESStreamIdVideo is not used here.
SrsTsPESStreamId sid = (SrsTsPESStreamId)stream_id;
if (sid != SrsTsPESStreamIdProgramStreamMap
&& sid != SrsTsPESStreamIdPaddingStream
&& sid != SrsTsPESStreamIdPrivateStream2
&& sid != SrsTsPESStreamIdEcmStream
&& sid != SrsTsPESStreamIdEmmStream
&& sid != SrsTsPESStreamIdProgramStreamDirectory
&& sid != SrsTsPESStreamIdDsmccStream
&& sid != SrsTsPESStreamIdH2221TypeE
) {
// 3B flags.
if (!stream->require(3)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE flags failed. ret=%d", ret);
return ret;
}
// 1B
original_or_copy = stream->read_1bytes();
// 1B
PES_extension_flag = stream->read_1bytes();
// 1B
PES_header_data_length = stream->read_1bytes();
// position of header start.
int pos_header = stream->pos();
//int8_t const2bits = (original_or_copy >> 6) & 0x03;
PES_scrambling_control = (original_or_copy >> 4) & 0x03;
PES_priority = (original_or_copy >> 3) & 0x01;
data_alignment_indicator = (original_or_copy >> 2) & 0x01;
copyright = (original_or_copy >> 1) & 0x01;
original_or_copy &= 0x01;
PTS_DTS_flags = (PES_extension_flag >> 6) & 0x03;
ESCR_flag = (PES_extension_flag >> 5) & 0x01;
ES_rate_flag = (PES_extension_flag >> 4) & 0x01;
DSM_trick_mode_flag = (PES_extension_flag >> 3) & 0x01;
additional_copy_info_flag = (PES_extension_flag >> 2) & 0x01;
PES_CRC_flag = (PES_extension_flag >> 1) & 0x01;
PES_extension_flag &= 0x01;
// check required together.
int nb_required = 0;
nb_required += (PTS_DTS_flags == 0x2)? 5:0;
nb_required += (PTS_DTS_flags == 0x3)? 10:0;
nb_required += ESCR_flag? 6:0;
nb_required += ES_rate_flag? 3:0;
nb_required += DSM_trick_mode_flag? 1:0;
nb_required += additional_copy_info_flag? 1:0;
nb_required += PES_CRC_flag? 2:0;
nb_required += PES_extension_flag? 1:0;
if (!stream->require(nb_required)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE payload failed. ret=%d", ret);
return ret;
}
// 5B
if (PTS_DTS_flags == 0x2) {
if ((ret = decode_33bits_dts_pts(stream, &pts)) != ERROR_SUCCESS) {
return ret;
}
dts = pts;
// update the dts and pts of message.
msg->dts = dts;
msg->pts = pts;
}
// 10B
if (PTS_DTS_flags == 0x3) {
if ((ret = decode_33bits_dts_pts(stream, &pts)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = decode_33bits_dts_pts(stream, &dts)) != ERROR_SUCCESS) {
return ret;
}
// check sync, the diff of dts and pts should never greater than 1s.
if (dts - pts > 90000 || pts - dts > 90000) {
srs_warn("ts: sync dts=%"PRId64", pts=%"PRId64, dts, pts);
}
// update the dts and pts of message.
msg->dts = dts;
msg->pts = pts;
}
// 6B
if (ESCR_flag) {
ESCR_extension = 0;
ESCR_base = 0;
stream->skip(6);
srs_warn("ts: demux PES, ignore the escr.");
}
// 3B
if (ES_rate_flag) {
ES_rate = stream->read_3bytes();
ES_rate = ES_rate >> 1;
ES_rate &= 0x3FFFFF;
}
// 1B
if (DSM_trick_mode_flag) {
trick_mode_control = stream->read_1bytes();
trick_mode_value = trick_mode_control & 0x1f;
trick_mode_control = (trick_mode_control >> 5) & 0x03;
}
// 1B
if (additional_copy_info_flag) {
additional_copy_info = stream->read_1bytes();
additional_copy_info &= 0x7f;
}
// 2B
if (PES_CRC_flag) {
previous_PES_packet_CRC = stream->read_2bytes();
}
// 1B
if (PES_extension_flag) {
PES_extension_flag_2 = stream->read_1bytes();
PES_private_data_flag = (PES_extension_flag_2 >> 7) & 0x01;
pack_header_field_flag = (PES_extension_flag_2 >> 6) & 0x01;
program_packet_sequence_counter_flag = (PES_extension_flag_2 >> 5) & 0x01;
P_STD_buffer_flag = (PES_extension_flag_2 >> 4) & 0x01;
PES_extension_flag_2 &= PES_extension_flag_2 & 0x01;
nb_required = 0;
nb_required += PES_private_data_flag? 16:0;
nb_required += pack_header_field_flag? 1:0; // 1+x bytes.
nb_required += program_packet_sequence_counter_flag? 2:0;
nb_required += P_STD_buffer_flag? 2:0;
nb_required += PES_extension_flag_2? 1:0; // 1+x bytes.
if (!stream->require(nb_required)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE ext payload failed. ret=%d", ret);
return ret;
}
// 16B
if (PES_private_data_flag) {
srs_freep(PES_private_data);
PES_private_data = new char[16];
stream->read_bytes(PES_private_data, 16);
}
// (1+x)B
if (pack_header_field_flag) {
pack_field_length = stream->read_1bytes();
if (pack_field_length > 0) {
// the adjust required bytes.
nb_required = nb_required - 16 - 1 + pack_field_length;
if (!stream->require(nb_required)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE ext pack failed. ret=%d", ret);
return ret;
}
srs_freep(pack_field);
pack_field = new char[pack_field_length];
stream->read_bytes(pack_field, pack_field_length);
}
}
// 2B
if (program_packet_sequence_counter_flag) {
program_packet_sequence_counter = stream->read_1bytes();
program_packet_sequence_counter &= 0x7f;
original_stuff_length = stream->read_1bytes();
MPEG1_MPEG2_identifier = (original_stuff_length >> 6) & 0x01;
original_stuff_length &= 0x3f;
}
// 2B
if (P_STD_buffer_flag) {
P_STD_buffer_size = stream->read_2bytes();
// '01'
//int8_t const2bits = (P_STD_buffer_scale >>14) & 0x03;
P_STD_buffer_scale = (P_STD_buffer_scale >>13) & 0x01;
P_STD_buffer_size &= 0x1FFF;
}
// (1+x)B
if (PES_extension_flag_2) {
PES_extension_field_length = stream->read_1bytes();
PES_extension_field_length &= 0x07;
if (PES_extension_field_length > 0) {
if (!stream->require(PES_extension_field_length)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE ext field failed. ret=%d", ret);
return ret;
}
srs_freep(PES_extension_field);
PES_extension_field = new char[PES_extension_field_length];
stream->read_bytes(PES_extension_field, PES_extension_field_length);
}
}
}
// stuffing_byte
nb_stuffings = PES_header_data_length - (stream->pos() - pos_header);
if (nb_stuffings > 0) {
if (!stream->require(nb_stuffings)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE stuffings failed. ret=%d", ret);
return ret;
}
stream->skip(nb_stuffings);
}
// PES_packet_data_byte, page58.
// the packet size contains the header size.
// The number of PES_packet_data_bytes, N, is specified by the
// PES_packet_length field. N shall be equal to the value
// indicated in the PES_packet_length minus the number of bytes
// between the last byte of the PES_packet_length field and the
// first PES_packet_data_byte.
/**
* when actual packet length > 0xffff(65535),
* which exceed the max u_int16_t packet length,
* use 0 packet length, the next unit start indicates the end of packet.
*/
if (PES_packet_length > 0) {
int nb_packet = PES_packet_length - (stream->pos() - pos_packet);
msg->PES_packet_length = srs_max(0, nb_packet);
}
// xB
if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) {
return ret;
}
} else if (sid == SrsTsPESStreamIdProgramStreamMap
|| sid == SrsTsPESStreamIdPrivateStream2
|| sid == SrsTsPESStreamIdEcmStream
|| sid == SrsTsPESStreamIdEmmStream
|| sid == SrsTsPESStreamIdProgramStreamDirectory
|| sid == SrsTsPESStreamIdDsmccStream
|| sid == SrsTsPESStreamIdH2221TypeE
) {
// for (i = 0; i < PES_packet_length; i++) {
// PES_packet_data_byte
// }
// xB
if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) {
return ret;
}
} else if (sid == SrsTsPESStreamIdPaddingStream) {
// for (i = 0; i < PES_packet_length; i++) {
// padding_byte
// }
nb_paddings = stream->size() - stream->pos();
stream->skip(nb_paddings);
srs_info("ts: drop %dB padding bytes", nb_paddings);
} else {
int nb_drop = stream->size() - stream->pos();
stream->skip(nb_drop);
srs_warn("ts: drop the pes packet %dB for stream_id=%#x", nb_drop, stream_id);
}
}
// check msg, reap when completed.
if (msg->completed(packet->payload_unit_start_indicator)) {
*ppmsg = msg;
channel->msg = NULL;
srs_info("ts: reap msg for completed.");
}
return ret;
}
int SrsTsPayloadPES::decode_33bits_dts_pts(SrsStream* stream, int64_t* pv)
{
int ret = ERROR_SUCCESS;
if (!stream->require(5)) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE dts/pts failed. ret=%d", ret);
return ret;
}
// decode the 33bits schema.
// ===========1B
// 4bits const maybe '0001', '0010' or '0011'.
// 3bits DTS/PTS [32..30]
// 1bit const '1'
int64_t dts_pts_30_32 = stream->read_1bytes();
if ((dts_pts_30_32 & 0x01) != 0x01) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE dts/pts 30-32 failed. ret=%d", ret);
return ret;
}
// @remark, we donot check the high 4bits, maybe '0001', '0010' or '0011'.
// so we just ensure the high 4bits is not 0x00.
if (((dts_pts_30_32 >> 4) & 0x0f) == 0x00) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE dts/pts 30-32 failed. ret=%d", ret);
return ret;
}
dts_pts_30_32 = (dts_pts_30_32 >> 1) & 0x07;
// ===========2B
// 15bits DTS/PTS [29..15]
// 1bit const '1'
int64_t dts_pts_15_29 = stream->read_2bytes();
if ((dts_pts_15_29 & 0x01) != 0x01) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE dts/pts 15-29 failed. ret=%d", ret);
return ret;
}
dts_pts_15_29 = (dts_pts_15_29 >> 1) & 0x7fff;
// ===========2B
// 15bits DTS/PTS [14..0]
// 1bit const '1'
int64_t dts_pts_0_14 = stream->read_2bytes();
if ((dts_pts_0_14 & 0x01) != 0x01) {
ret = ERROR_STREAM_CASTER_TS_PSE;
srs_error("ts: demux PSE dts/pts 0-14 failed. ret=%d", ret);
return ret;
}
dts_pts_0_14 = (dts_pts_0_14 >> 1) & 0x7fff;
int64_t v = 0x00;
v |= (dts_pts_30_32 << 30) & 0x1c0000000LL;
v |= (dts_pts_15_29 << 15) & 0x3fff8000LL;
v |= dts_pts_0_14 & 0x7fff;
*pv = v;
return ret;
}
@ -849,7 +1394,7 @@ SrsTsPayloadPSI::~SrsTsPayloadPSI()
{
}
int SrsTsPayloadPSI::decode(SrsStream* stream)
int SrsTsPayloadPSI::decode(SrsStream* stream, SrsTsMessage** /*ppmsg*/)
{
int ret = ERROR_SUCCESS;