1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

parse video/audio message

This commit is contained in:
winlin 2013-11-19 18:38:33 +08:00
parent 6b75390476
commit 1ae03a2ba0

View file

@ -17,11 +17,13 @@ g++ -o ts_info ts_info.cpp -g -O0 -ansi
#include <assert.h> #include <assert.h>
#include <vector> #include <vector>
#include <map>
#define trace(msg, ...) printf(msg"\n", ##__VA_ARGS__); #define trace(msg, ...) printf(msg"\n", ##__VA_ARGS__);
#define srs_freep(p) delete p; p = NULL #define srs_freep(p) delete p; p = NULL
#define srs_freepa(p) delete[] p; p = NULL #define srs_freepa(p) delete[] p; p = NULL
#define srs_assert(p) assert(p) #define srs_assert(p) assert(p)
#define srs_min(a, b) ((a)<(b)? (a):(b))
#endif #endif
/** /**
@ -111,6 +113,7 @@ class TSPayloadPAT;
class TSPayloadPMT; class TSPayloadPMT;
class TSPayloadPES; class TSPayloadPES;
class TSContext; class TSContext;
class TSMessage;
// TSPacket declares. // TSPacket declares.
class TSPacket class TSPacket
@ -122,7 +125,7 @@ public:
TSPacket(); TSPacket();
virtual ~TSPacket(); virtual ~TSPacket();
int demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
int finish(); int finish();
}; };
@ -145,7 +148,7 @@ public:
TSHeader(); TSHeader();
virtual ~TSHeader(); virtual ~TSHeader();
int get_size(); int get_size();
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
// variant ts packet adation field. page 40. // variant ts packet adation field. page 40.
@ -213,7 +216,7 @@ public:
TSAdaptionField(); TSAdaptionField();
virtual ~TSAdaptionField(); virtual ~TSAdaptionField();
int get_size(); int get_size();
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
// variant ts packet payload. // variant ts packet payload.
@ -247,7 +250,7 @@ public:
TSPayload(); TSPayload();
virtual ~TSPayload();; virtual ~TSPayload();;
void read_pointer_field(TSPacket* pkt, u_int8_t*& p); void read_pointer_field(TSPacket* pkt, u_int8_t*& p);
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
@ -294,7 +297,7 @@ public:
TSPayloadPAT(); TSPayloadPAT();
virtual ~TSPayloadPAT(); virtual ~TSPayloadPAT();
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
class TSPMTESInfo class TSPMTESInfo
@ -364,7 +367,7 @@ public:
TSPayloadPMT(); TSPayloadPMT();
virtual ~TSPayloadPMT(); virtual ~TSPayloadPMT();
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
/** /**
@ -478,14 +481,12 @@ public:
int stuffing_size; int stuffing_size;
char* stuffing_byte; char* stuffing_byte;
char* PES_packet_data_byte; //[PES_packet_length] bytes
TSPayloadPES(); TSPayloadPES();
virtual ~TSPayloadPES(); virtual ~TSPayloadPES();
int64_t decode_33bits_int(u_int8_t*& p, int64_t& temp); int64_t decode_33bits_int(u_int8_t*& p, int64_t& temp);
int64_t decode_33bits_int(int64_t& temp); int64_t decode_33bits_int(int64_t& temp);
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
class TSPayloadReserved class TSPayloadReserved
@ -496,7 +497,7 @@ public:
TSPayloadReserved(); TSPayloadReserved();
virtual ~TSPayloadReserved(); virtual ~TSPayloadReserved();
int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p); int demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg);
}; };
struct TSPid struct TSPid
@ -505,6 +506,48 @@ struct TSPid
int16_t pid; int16_t pid;
}; };
/**
* logic audio/video message
*/
class TSMessage
{
public:
// 2.4.3.2 Transport Stream packet layer. page 36
// the pid of PES packet.
int16_t pid;
// the type of pid.
TSPidType type;
// 2.4.3.7 Semantic definition of fields in PES packet. page 49
// PES packet header size plus data size.
u_int16_t PES_packet_length; //16bits
// the stream id.
u_int8_t stream_id;
// 2.4.3.7 Semantic definition of fields in PES packet. page 49.
int32_t packet_start_code_prefix;
// header size.
int packet_header_size;
// the parsed packet size.
int parsed_packet_size;
// total packet size.
int packet_data_size;
char* packet_data;
TSMessage();
virtual ~TSMessage();
void append(u_int8_t*& p, int size);
void detach(TSContext* ctx, TSMessage*& pmsg);
bool is_video();
};
// ts context // ts context
class TSContext class TSContext
{ {
@ -514,12 +557,16 @@ public:
*/ */
int pid_size; int pid_size;
TSPid* pids; TSPid* pids;
std::map<int16_t, TSMessage*> msgs;
TSContext(); TSContext();
virtual ~TSContext(); virtual ~TSContext();
bool exists(int16_t pid); bool exists(int16_t pid);
TSPid* get(int16_t pid); TSPid* get(int16_t pid);
void push(TSPidType type, int16_t pid); void push(TSPidType type, int16_t pid);
TSMessage* get_msg(int16_t pid);
void detach(TSMessage* msg);
}; };
TSContext::TSContext() TSContext::TSContext()
@ -531,6 +578,13 @@ TSContext::TSContext()
TSContext::~TSContext() TSContext::~TSContext()
{ {
srs_freepa(pids); srs_freepa(pids);
std::map<int16_t, TSMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
TSMessage* msg = it->second;
srs_freep(msg);
}
msgs.clear();
} }
bool TSContext::exists(int16_t pid) bool TSContext::exists(int16_t pid)
@ -571,6 +625,62 @@ void TSContext::push(TSPidType type, int16_t pid)
pids = p; pids = p;
} }
TSMessage* TSContext::get_msg(int16_t pid)
{
if (msgs[pid] == NULL) {
TSMessage* msg = new TSMessage();
msg->pid = pid;
msgs[pid] = msg;
}
return msgs[pid];
}
void TSContext::detach(TSMessage* msg)
{
msgs[msg->pid] = NULL;
}
TSMessage::TSMessage()
{
pid = 0;
type = TSPidTypeReserved;
stream_id = 0;
packet_start_code_prefix = 0;
PES_packet_length = 0;
packet_header_size = 0;
parsed_packet_size = 0;
packet_data_size = 0;
packet_data = NULL;
}
TSMessage::~TSMessage()
{
srs_freepa(packet_data);
}
void TSMessage::append(u_int8_t*& p, int size)
{
if (size > 0) {
memcpy(packet_data + parsed_packet_size, p, size);
p += size;
parsed_packet_size += size;
}
}
void TSMessage::detach(TSContext* ctx, TSMessage*& pmsg)
{
if (parsed_packet_size >= packet_data_size) {
ctx->detach(this);
pmsg = this;
}
}
bool TSMessage::is_video()
{
return type == TSPidTypeVideo;
}
TSAdaptionField::TSAdaptionField() TSAdaptionField::TSAdaptionField()
{ {
adaption_field_length = 0; adaption_field_length = 0;
@ -620,7 +730,7 @@ int TSAdaptionField::get_size()
return __user_size; return __user_size;
} }
int TSAdaptionField::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSAdaptionField::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -765,7 +875,7 @@ TSPayloadReserved::~TSPayloadReserved()
srs_freepa(bytes); srs_freepa(bytes);
} }
int TSPayloadReserved::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSPayloadReserved::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -802,7 +912,7 @@ TSPayloadPAT::~TSPayloadPAT()
srs_freepa(programs); srs_freepa(programs);
} }
int TSPayloadPAT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSPayloadPAT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -895,7 +1005,7 @@ TSPayloadPMT::~TSPayloadPMT()
ES_info.clear(); ES_info.clear();
} }
int TSPayloadPMT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSPayloadPMT::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -1034,7 +1144,6 @@ TSPayloadPES::TSPayloadPES()
PES_extension_field = NULL; PES_extension_field = NULL;
stuffing_size = 0; stuffing_size = 0;
stuffing_byte = NULL; stuffing_byte = NULL;
PES_packet_data_byte = NULL;
} }
TSPayloadPES::~TSPayloadPES() TSPayloadPES::~TSPayloadPES()
@ -1043,7 +1152,6 @@ TSPayloadPES::~TSPayloadPES()
srs_freepa(pack_field); srs_freepa(pack_field);
srs_freepa(PES_extension_field); srs_freepa(PES_extension_field);
srs_freepa(stuffing_byte); srs_freepa(stuffing_byte);
srs_freepa(PES_packet_data_byte);
} }
int64_t TSPayloadPES::decode_33bits_int(u_int8_t*& p, int64_t& temp) int64_t TSPayloadPES::decode_33bits_int(u_int8_t*& p, int64_t& temp)
@ -1080,7 +1188,7 @@ int64_t TSPayloadPES::decode_33bits_int(int64_t& temp)
return ret; return ret;
} }
int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -1096,6 +1204,7 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t
pp = (char*)&PES_packet_length; pp = (char*)&PES_packet_length;
pp[1] = *p++; pp[1] = *p++;
pp[0] = *p++; pp[0] = *p++;
u_int8_t* pos_packet = p;
if (stream_id != PES_program_stream_map if (stream_id != PES_program_stream_map
&& stream_id != PES_padding_stream && stream_id != PES_padding_stream
@ -1126,7 +1235,7 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t
PES_extension_flag &= 0x01; PES_extension_flag &= 0x01;
PES_header_data_length = *p++; PES_header_data_length = *p++;
u_int8_t* pos = p; u_int8_t* pos_header = p;
int64_t temp = 0; int64_t temp = 0;
if (PTS_DTS_flags == 0x2) { if (PTS_DTS_flags == 0x2) {
@ -1252,15 +1361,49 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t
} }
// stuffing_byte // stuffing_byte
int stuffing_size = PES_header_data_length - (p - pos); int stuffing_size = PES_header_data_length - (p - pos_header);
if (stuffing_size > 0) { if (stuffing_size > 0) {
stuffing_byte = new char[stuffing_size]; stuffing_byte = new char[stuffing_size];
memcpy(stuffing_byte, p, stuffing_size); memcpy(stuffing_byte, p, stuffing_size);
p += stuffing_size; p += stuffing_size;
} }
// get the pid.
TSPid* pid = ctx->get(pkt->header->pid);
if (!pid) {
trace("ts+pes pid: %d type is invalid.", pkt->header->pid);
}
// get the message to build from the chunks(PES packets).
TSMessage* msg = ctx->get_msg(pid->pid);
msg->type = pid->type;
msg->stream_id = stream_id;
msg->packet_start_code_prefix = packet_start_code_prefix;
// PES_packet_data_byte, page58.
// the packet size contains the header size.
// The number of PES_packet_data_bytes, N, is specified by the
// PES_packet_length field. N shall be equal to the value
// indicated in the PES_packet_length minus the number of bytes
// between the last byte of the PES_packet_length field and the
// first PES_packet_data_byte.
msg->PES_packet_length = PES_packet_length;
msg->packet_header_size = p - pos_packet;
msg->packet_data_size = PES_packet_length - msg->packet_header_size;
if (msg->packet_data_size > 0) {
msg->packet_data = new char[msg->packet_data_size];
}
// PES_packet_data_byte // PES_packet_data_byte
PES_packet_data_byte = new char[PES_packet_length]; int size = srs_min(msg->packet_data_size, last - p);
msg->append(p, size);
msg->detach(ctx, pmsg);
trace("ts+pes stream_id: %d size: %d pts: %"PRId64" dts: %"PRId64" packet_size: %d parsed_size: %d",
stream_id, PES_packet_length, pts, dts, msg->packet_data_size, msg->parsed_packet_size);
} else if (stream_id == PES_program_stream_map } else if (stream_id == PES_program_stream_map
|| stream_id == PES_private_stream_2 || stream_id == PES_private_stream_2
|| stream_id == PES_ECM_stream || stream_id == PES_ECM_stream
@ -1277,9 +1420,6 @@ int TSPayloadPES::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t
// padding_byte // padding_byte
// } // }
} }
trace("ts+pes stream_id: %d size: %d pts: %"PRId64" dts: %"PRId64"",
stream_id, PES_packet_length, pts, dts);
return ret; return ret;
} }
@ -1317,7 +1457,7 @@ void TSPayload::read_pointer_field(TSPacket* pkt, u_int8_t*& p)
} }
} }
int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -1326,7 +1466,7 @@ int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* l
type = TSPidTypePAT; type = TSPidTypePAT;
pat = new TSPayloadPAT(); pat = new TSPayloadPAT();
return pat->demux(ctx, pkt, start, last, p); return pat->demux(ctx, pkt, start, last, p, pmsg);
} }
TSPid* pid = ctx->get(pkt->header->pid); TSPid* pid = ctx->get(pkt->header->pid);
@ -1335,18 +1475,28 @@ int TSPayload::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* l
type = pid->type; type = pid->type;
pmt = new TSPayloadPMT(); pmt = new TSPayloadPMT();
return pmt->demux(ctx, pkt, start, last, p); return pmt->demux(ctx, pkt, start, last, p, pmsg);
} }
if (pid && (pid->type == TSPidTypeVideo || pid->type == TSPidTypeAudio)) { if (pid && (pid->type == TSPidTypeVideo || pid->type == TSPidTypeAudio)) {
if (!pkt->header->payload_unit_start_indicator) {
TSMessage* msg = ctx->get_msg(pkt->header->pid);
if (msg->packet_start_code_prefix != 0x01) {
trace("ts+pes decode continous packet error, msg is empty.");
return -1;
}
msg->append(p, last - p);
msg->detach(ctx, pmsg);
return ret;
}
type = pid->type; type = pid->type;
pes = new TSPayloadPES(); pes = new TSPayloadPES();
return pes->demux(ctx, pkt, start, last, p); return pes->demux(ctx, pkt, start, last, p, pmsg);
} }
// not parsed bytes. // not parsed bytes.
type = TSPidTypeReserved; type = TSPidTypeReserved;
reserved = new TSPayloadReserved(); reserved = new TSPayloadReserved();
if ((ret = reserved->demux(ctx, pkt, start, last, p)) != 0) { if ((ret = reserved->demux(ctx, pkt, start, last, p, pmsg)) != 0) {
return ret; return ret;
} }
@ -1367,16 +1517,16 @@ TSPacket::~TSPacket()
srs_freep(payload); srs_freep(payload);
} }
int TSPacket::demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSPacket::demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
if ((ret = header->demux(ctx, this, start, last, p)) != 0) { if ((ret = header->demux(ctx, this, start, last, p, pmsg)) != 0) {
return ret; return ret;
} }
if (header->adaption_field_control == AFC_ADAPTION_ONLY || header->adaption_field_control == AFC_BOTH) { if (header->adaption_field_control == AFC_ADAPTION_ONLY || header->adaption_field_control == AFC_BOTH) {
if ((ret = adaption_field->demux(ctx, this, start, last, p)) != 0) { if ((ret = adaption_field->demux(ctx, this, start, last, p, pmsg)) != 0) {
trace("ts+header af(adaption field) decode error. ret=%d", ret); trace("ts+header af(adaption field) decode error. ret=%d", ret);
return ret; return ret;
} }
@ -1387,7 +1537,7 @@ int TSPacket::demux(TSContext* ctx, u_int8_t* start, u_int8_t* last, u_int8_t*&
payload->size = TS_PACKET_SIZE - header->get_size() - adaption_field->get_size(); payload->size = TS_PACKET_SIZE - header->get_size() - adaption_field->get_size();
if (header->adaption_field_control == AFC_PAYLOAD_ONLY || header->adaption_field_control == AFC_BOTH) { if (header->adaption_field_control == AFC_PAYLOAD_ONLY || header->adaption_field_control == AFC_BOTH) {
if ((ret = payload->demux(ctx, this, start, last, p)) != 0) { if ((ret = payload->demux(ctx, this, start, last, p, pmsg)) != 0) {
trace("ts+header payload decode error. ret=%d", ret); trace("ts+header payload decode error. ret=%d", ret);
return ret; return ret;
} }
@ -1427,7 +1577,7 @@ int TSHeader::get_size()
return 4; return 4;
} }
int TSHeader::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p) int TSHeader::demux(TSContext* ctx, TSPacket* pkt, u_int8_t* start, u_int8_t* last, u_int8_t*& p, TSMessage*& pmsg)
{ {
int ret = 0; int ret = 0;
@ -1494,11 +1644,15 @@ int main(int /*argc*/, char** /*argv*/)
u_int8_t* last = ts_packet + TS_PACKET_SIZE; u_int8_t* last = ts_packet + TS_PACKET_SIZE;
TSPacket pkt; TSPacket pkt;
if ((ret = pkt.demux(&ctx, start, last, p)) != 0) { TSMessage* msg = NULL;
if ((ret = pkt.demux(&ctx, start, last, p, msg)) != 0) {
trace("demuxer+read decode ts packet error. ret=%d", ret); trace("demuxer+read decode ts packet error. ret=%d", ret);
return ret; return ret;
} }
// TODO: process it.
srs_freep(msg);
offset += nread; offset += nread;
} }