diff --git a/trunk/research/ts_info.cpp b/trunk/research/ts_info.cpp index a4536b377..5614a339a 100755 --- a/trunk/research/ts_info.cpp +++ b/trunk/research/ts_info.cpp @@ -17,11 +17,13 @@ g++ -o ts_info ts_info.cpp -g -O0 -ansi #include #include +#include #define trace(msg, ...) printf(msg"\n", ##__VA_ARGS__); #define srs_freep(p) delete p; p = NULL #define srs_freepa(p) delete[] p; p = NULL #define srs_assert(p) assert(p) +#define srs_min(a, b) ((a)<(b)? (a):(b)) #endif /** @@ -111,6 +113,7 @@ class TSPayloadPAT; class TSPayloadPMT; class TSPayloadPES; class TSContext; +class TSMessage; // TSPacket declares. class TSPacket @@ -122,7 +125,7 @@ public: TSPacket(); virtual ~TSPacket(); - int demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); int finish(); }; @@ -145,7 +148,7 @@ public: TSHeader(); virtual ~TSHeader(); int get_size(); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; // variant ts packet adation field. page 40. @@ -213,7 +216,7 @@ public: TSAdaptionField(); virtual ~TSAdaptionField(); int get_size(); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; // variant ts packet payload. @@ -247,7 +250,7 @@ public: TSPayload(); virtual ~TSPayload();; void read_pointer_field(TSPacket* pkt, u_int8_t*& p); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; @@ -294,7 +297,7 @@ public: TSPayloadPAT(); virtual ~TSPayloadPAT(); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; class TSPMTESInfo @@ -364,7 +367,7 @@ public: TSPayloadPMT(); virtual ~TSPayloadPMT(); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; /** @@ -478,14 +481,12 @@ public: int stuffing_size; char* stuffing_byte; - - char* PES_packet_data_byte; //[PES_packet_length] bytes TSPayloadPES(); virtual ~TSPayloadPES(); int64_t decode_33bits_int(u_int8_t*& p, int64_t& temp); int64_t decode_33bits_int(int64_t& temp); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; class TSPayloadReserved @@ -496,7 +497,7 @@ public: TSPayloadReserved(); virtual ~TSPayloadReserved(); - int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); + int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg); }; struct TSPid @@ -505,6 +506,48 @@ struct TSPid int16_t pid; }; +/** +* logic audio/video message +*/ +class TSMessage +{ +public: + // 2.4.3.2 Transport Stream packet layer. page 36 + // the pid of PES packet. + int16_t pid; + + // the type of pid. + TSPidType type; + + // 2.4.3.7 Semantic definition of fields in PES packet. page 49 + // PES packet header size plus data size. + u_int16_t PES_packet_length; //16bits + + // the stream id. + u_int8_t stream_id; + + // 2.4.3.7 Semantic definition of fields in PES packet. page 49. + int32_t packet_start_code_prefix; + + // header size. + int packet_header_size; + + // the parsed packet size. + int parsed_packet_size; + + // total packet size. + int packet_data_size; + char* packet_data; + + TSMessage(); + virtual ~TSMessage(); + + void append(u_int8_t*& p, int size); + void detach(TSContext* ctx, TSMessage*& pmsg); + + bool is_video(); +}; + // ts context class TSContext { @@ -514,12 +557,16 @@ public: */ int pid_size; TSPid* pids; + std::map msgs; TSContext(); virtual ~TSContext(); bool exists(int16_t pid); TSPid* get(int16_t pid); void push(TSPidType type, int16_t pid); + + TSMessage* get_msg(int16_t pid); + void detach(TSMessage* msg); }; TSContext::TSContext() @@ -531,6 +578,13 @@ TSContext::TSContext() TSContext::~TSContext() { srs_freepa(pids); + + std::map::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + TSMessage* msg = it->second; + srs_freep(msg); + } + msgs.clear(); } bool TSContext::exists(int16_t pid) @@ -571,6 +625,62 @@ void TSContext::push(TSPidType type, int16_t pid) pids = p; } +TSMessage* TSContext::get_msg(int16_t pid) +{ + if (msgs[pid] == NULL) { + TSMessage* msg = new TSMessage(); + msg->pid = pid; + msgs[pid] = msg; + } + + return msgs[pid]; +} + +void TSContext::detach(TSMessage* msg) +{ + msgs[msg->pid] = NULL; +} + +TSMessage::TSMessage() +{ + pid = 0; + type = TSPidTypeReserved; + stream_id = 0; + packet_start_code_prefix = 0; + PES_packet_length = 0; + packet_header_size = 0; + parsed_packet_size = 0; + packet_data_size = 0; + packet_data = NULL; +} + +TSMessage::~TSMessage() +{ + srs_freepa(packet_data); +} + +void TSMessage::append(u_int8_t*& p, int size) +{ + if (size > 0) { + memcpy(packet_data + parsed_packet_size, p, size); + p += size; + parsed_packet_size += size; + } +} + +void TSMessage::detach(TSContext* ctx, TSMessage*& pmsg) +{ + if (parsed_packet_size >= packet_data_size) { + ctx->detach(this); + pmsg = this; + } +} + +bool TSMessage::is_video() +{ + return type == TSPidTypeVideo; +} + TSAdaptionField::TSAdaptionField() { adaption_field_length = 0; @@ -620,7 +730,7 @@ int TSAdaptionField::get_size() return __user_size; } -int TSAdaptionField::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSAdaptionField::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -765,7 +875,7 @@ TSPayloadReserved::~TSPayloadReserved() srs_freepa(bytes); } -int TSPayloadReserved::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSPayloadReserved::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -802,7 +912,7 @@ TSPayloadPAT::~TSPayloadPAT() srs_freepa(programs); } -int TSPayloadPAT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSPayloadPAT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -895,7 +1005,7 @@ TSPayloadPMT::~TSPayloadPMT() ES_info.clear(); } -int TSPayloadPMT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSPayloadPMT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -1034,7 +1144,6 @@ TSPayloadPES::TSPayloadPES() PES_extension_field = NULL; stuffing_size = 0; stuffing_byte = NULL; - PES_packet_data_byte = NULL; } TSPayloadPES::~TSPayloadPES() @@ -1043,7 +1152,6 @@ TSPayloadPES::~TSPayloadPES() srs_freepa(pack_field); srs_freepa(PES_extension_field); srs_freepa(stuffing_byte); - srs_freepa(PES_packet_data_byte); } int64_t TSPayloadPES::decode_33bits_int(u_int8_t*& p, int64_t& temp) @@ -1080,7 +1188,7 @@ int64_t TSPayloadPES::decode_33bits_int(int64_t& temp) return ret; } -int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -1096,6 +1204,7 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t pp = (char*)&PES_packet_length; pp[1] = *p++; pp[0] = *p++; + u_int8_t* pos_packet = p; if (stream_id != PES_program_stream_map && stream_id != PES_padding_stream @@ -1126,7 +1235,7 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t PES_extension_flag &= 0x01; PES_header_data_length = *p++; - u_int8_t* pos = p; + u_int8_t* pos_header = p; int64_t temp = 0; if (PTS_DTS_flags == 0x2) { @@ -1252,15 +1361,49 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t } // stuffing_byte - int stuffing_size = PES_header_data_length - (p - pos); + int stuffing_size = PES_header_data_length - (p - pos_header); if (stuffing_size > 0) { stuffing_byte = new char[stuffing_size]; memcpy(stuffing_byte, p, stuffing_size); p += stuffing_size; } + // get the pid. + TSPid* pid = ctx->get(pkt->header->pid); + if (!pid) { + trace("ts+pes pid: %d type is invalid.", pkt->header->pid); + } + + // get the message to build from the chunks(PES packets). + TSMessage* msg = ctx->get_msg(pid->pid); + + msg->type = pid->type; + msg->stream_id = stream_id; + msg->packet_start_code_prefix = packet_start_code_prefix; + + // PES_packet_data_byte, page58. + // the packet size contains the header size. + // The number of PES_packet_data_bytes, N, is specified by the + // PES_packet_length field. N shall be equal to the value + // indicated in the PES_packet_length minus the number of bytes + // between the last byte of the PES_packet_length field and the + // first PES_packet_data_byte. + msg->PES_packet_length = PES_packet_length; + msg->packet_header_size = p - pos_packet; + msg->packet_data_size = PES_packet_length - msg->packet_header_size; + + if (msg->packet_data_size > 0) { + msg->packet_data = new char[msg->packet_data_size]; + } + // PES_packet_data_byte - PES_packet_data_byte = new char[PES_packet_length]; + int size = srs_min(msg->packet_data_size, last - p); + msg->append(p, size); + + msg->detach(ctx, pmsg); + + trace("ts+pes stream_id: %d size: %d pts: %"PRId64" dts: %"PRId64" packet_size: %d parsed_size: %d", + stream_id, PES_packet_length, pts, dts, msg->packet_data_size, msg->parsed_packet_size); } else if (stream_id == PES_program_stream_map || stream_id == PES_private_stream_2 || stream_id == PES_ECM_stream @@ -1277,9 +1420,6 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t // padding_byte // } } - - trace("ts+pes stream_id: %d size: %d pts: %"PRId64" dts: %"PRId64"", - stream_id, PES_packet_length, pts, dts); return ret; } @@ -1317,7 +1457,7 @@ void TSPayload::read_pointer_field(TSPacket* pkt, u_int8_t*& p) } } -int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -1326,7 +1466,7 @@ int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* l type = TSPidTypePAT; pat = new TSPayloadPAT(); - return pat->demux(ctx, pkt, start, last, p); + return pat->demux(ctx, pkt, start, last, p, pmsg); } TSPid* pid = ctx->get(pkt->header->pid); @@ -1335,18 +1475,28 @@ int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* l type = pid->type; pmt = new TSPayloadPMT(); - return pmt->demux(ctx, pkt, start, last, p); + return pmt->demux(ctx, pkt, start, last, p, pmsg); } if (pid && (pid->type == TSPidTypeVideo || pid->type == TSPidTypeAudio)) { + if (!pkt->header->payload_unit_start_indicator) { + TSMessage* msg = ctx->get_msg(pkt->header->pid); + if (msg->packet_start_code_prefix != 0x01) { + trace("ts+pes decode continous packet error, msg is empty."); + return -1; + } + msg->append(p, last - p); + msg->detach(ctx, pmsg); + return ret; + } type = pid->type; pes = new TSPayloadPES(); - return pes->demux(ctx, pkt, start, last, p); + return pes->demux(ctx, pkt, start, last, p, pmsg); } // not parsed bytes. type = TSPidTypeReserved; reserved = new TSPayloadReserved(); - if ((ret = reserved->demux(ctx, pkt, start, last, p)) != 0) { + if ((ret = reserved->demux(ctx, pkt, start, last, p, pmsg)) != 0) { return ret; } @@ -1367,16 +1517,16 @@ TSPacket::~TSPacket() srs_freep(payload); } -int TSPacket::demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSPacket::demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; - if ((ret = header->demux(ctx, this, start, last, p)) != 0) { + if ((ret = header->demux(ctx, this, start, last, p, pmsg)) != 0) { return ret; } if (header->adaption_field_control == AFC_ADAPTION_ONLY || header->adaption_field_control == AFC_BOTH) { - if ((ret = adaption_field->demux(ctx, this, start, last, p)) != 0) { + if ((ret = adaption_field->demux(ctx, this, start, last, p, pmsg)) != 0) { trace("ts+header af(adaption field) decode error. ret=%d", ret); return ret; } @@ -1387,7 +1537,7 @@ int TSPacket::demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& payload->size = TS_PACKET_SIZE - header->get_size() - adaption_field->get_size(); if (header->adaption_field_control == AFC_PAYLOAD_ONLY || header->adaption_field_control == AFC_BOTH) { - if ((ret = payload->demux(ctx, this, start, last, p)) != 0) { + if ((ret = payload->demux(ctx, this, start, last, p, pmsg)) != 0) { trace("ts+header payload decode error. ret=%d", ret); return ret; } @@ -1427,7 +1577,7 @@ int TSHeader::get_size() return 4; } -int TSHeader::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) +int TSHeader::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg) { int ret = 0; @@ -1494,11 +1644,15 @@ int main(int /*argc*/, char** /*argv*/) u_int8_t* last = ts_packet + TS_PACKET_SIZE; TSPacket pkt; - if ((ret = pkt.demux(&ctx, start, last, p)) != 0) { + TSMessage* msg = NULL; + if ((ret = pkt.demux(&ctx, start, last, p, msg)) != 0) { trace("demuxer+read decode ts packet error. ret=%d", ret); return ret; } + // TODO: process it. + srs_freep(msg); + offset += nread; }