diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index 90cc389c4..da062849a 100644 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -5,8 +5,6 @@ listen 1935; max_connections 1000; srs_log_tank file; srs_log_file ./objs/srs.log; -daemon off; - http_api { enabled on; listen 1985; @@ -16,16 +14,9 @@ http_server { listen 8080; dir ./objs/nginx/html; } - -srt_server { - enabled on; - listen 10080; -} - -vhost sina.mobile.com.cn { -} - stats { network 0; disk sda sdb xvda xvdb; } +vhost __defaultVhost__ { +} diff --git a/trunk/conf/srt.conf b/trunk/conf/srt.conf new file mode 100644 index 000000000..ec37b0367 --- /dev/null +++ b/trunk/conf/srt.conf @@ -0,0 +1,34 @@ +# main config for srs. +# @see full.conf for detail config. + +listen 1935; +max_connections 1000; +srs_log_tank file; +srs_log_file ./objs/srs.log; +daemon off; + +http_api { + enabled on; + listen 1985; +} +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} + +srt_server { + enabled on; + listen 10080; +} + +vhost __defaultVhost__ { +} + +vhost srs.srt.com.cn { +} + +stats { + network 0; + disk sda sdb xvda xvdb; +} diff --git a/trunk/configure b/trunk/configure index 058b1f992..ad517dc93 100755 --- a/trunk/configure +++ b/trunk/configure @@ -211,7 +211,7 @@ PROTOCOL_OBJS="${MODULE_OBJS[@]}" MODULE_ID="SRT" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") ModuleLibIncs=(${SRS_OBJS_DIR}) -MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp") +MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data") SRT_INCS="src/srt"; MODULE_DIR=${SRT_INCS} . auto/modules.sh SRT_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index e443a3318..2987b4891 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3640,7 +3640,7 @@ srs_error_t SrsConfig::check_normal_config() get_vhosts(vhosts); for (int n = 0; n < (int)vhosts.size(); n++) { SrsConfDirective* vhost = vhosts[n]; - printf("virtural host name:%s, arg:%s\r\n", vhost->name.c_str(), vhost->args[0].c_str()); + for (int i = 0; vhost && i < (int)vhost->directives.size(); i++) { SrsConfDirective* conf = vhost->at(i); string n = conf->name; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 226f0dc1e..62d97fd97 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1014,6 +1014,12 @@ srs_error_t SrsServer::listen_srt() { srs_trace("srt server is enabled..."); unsigned short srt_port = _srs_config->get_srt_listen_port(); srs_trace("srt server listen port:%d", srt_port); + err = srt2rtmp::get_instance()->init(); + if (err != srs_success) { + srs_error_wrap(err, "srt start srt2rtmp error"); + return err; + } + srt_ptr = std::make_shared(srt_port); if (!srt_ptr) { srs_error_wrap(err, "srt listen %d", srt_port); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 625c161e7..c03fc996a 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -37,6 +37,7 @@ #include #include #include "../srt/srt_server.hpp" +#include "../srt/srt_to_rtmp.hpp" class SrsServer; class SrsConnection; diff --git a/trunk/src/srt/srt_conn.hpp b/trunk/src/srt/srt_conn.hpp index 168ca0769..6801a1c84 100644 --- a/trunk/src/srt/srt_conn.hpp +++ b/trunk/src/srt/srt_conn.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -14,6 +15,21 @@ #define PULL_SRT_MODE 0x01 #define PUSH_SRT_MODE 0x02 +inline bool is_streamid_valid(const std::string& streamid) { + bool ret = false; + if (streamid.empty()) { + return ret; + } + std::vector ret_vec; + string_split(streamid, "/", ret_vec); + + if (ret_vec.size() < 2) { + return ret; + } + ret = true; + return ret; +} + inline void get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash) { std::string real_streamip; diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp index 8cfec1864..075064b14 100644 --- a/trunk/src/srt/srt_data.cpp +++ b/trunk/src/srt/srt_data.cpp @@ -1,4 +1,11 @@ #include "srt_data.hpp" +#include + +SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len) + ,_key_path(path) { + _data_p = new unsigned char[len]; + memset(_data_p, 0, len); +} SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len) ,_key_path(path) diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp index a1a225b90..ab9cf81ea 100644 --- a/trunk/src/srt/srt_data.hpp +++ b/trunk/src/srt/srt_data.hpp @@ -5,6 +5,7 @@ class SRT_DATA_MSG { public: + SRT_DATA_MSG(unsigned int len, const std::string& path); SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path); ~SRT_DATA_MSG(); diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 60914e2a4..226013923 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -25,7 +25,6 @@ long long srt_now_ms = 0; srt_handle::srt_handle():_run_flag(false) ,_last_timestamp(0) ,_last_check_alive_ts(0) { - _srt2rtmp_ptr = std::make_shared(); } srt_handle::~srt_handle() { @@ -43,14 +42,12 @@ int srt_handle::start() { srs_trace("srt handle is starting..."); _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); - _srt2rtmp_ptr->start(); return 0; } void srt_handle::stop() { _run_flag = false; _work_thread_ptr->join(); - _srt2rtmp_ptr->stop(); return; } @@ -295,7 +292,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp } srt_conn_ptr->update_timestamp(srt_now_ms); - _srt2rtmp_ptr->insert_data_message(data, ret, subpath); + srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); //send data to subscriber(players) //streamid, play map diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp index dd9e0f926..da555b714 100644 --- a/trunk/src/srt/srt_handle.hpp +++ b/trunk/src/srt/srt_handle.hpp @@ -75,8 +75,6 @@ private: long long _last_timestamp; long long _last_check_alive_ts; - - SRT2RTMP_PTR _srt2rtmp_ptr; }; #endif //SRT_HANDLE_H \ No newline at end of file diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 691abaa49..b938f3678 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -118,7 +118,11 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd } //add new srt connect into srt handle std::string streamid = UDT::getstreamid(conn_fd); - + if (!is_streamid_valid(streamid)) { + srs_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd); + srt_close(conn_fd); + return; + } SRT_CONN_PTR srt_conn_ptr = std::make_shared(conn_fd, streamid); std::string vhost_str = srt_conn_ptr->get_vhost(); @@ -179,7 +183,6 @@ void srt_server::on_work() int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1, nullptr, nullptr, nullptr, nullptr); if (ret < 0) { - srs_error("listen srt epoll is timeout, port=%d", listen_port); continue; } srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index 647931738..ecb348b20 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -5,38 +5,56 @@ #include #include #include +#include "stringex.hpp" -srt2rtmp::srt2rtmp():_run_flag(false) { +std::shared_ptr srt2rtmp::s_srt2rtmp_ptr; + +std::shared_ptr srt2rtmp::get_instance() { + if (!s_srt2rtmp_ptr) { + s_srt2rtmp_ptr = std::make_shared(); + } + return s_srt2rtmp_ptr; +} + +srt2rtmp::srt2rtmp() { } srt2rtmp::~srt2rtmp() { - + release(); } -void srt2rtmp::start() { - _run_flag = true; +srs_error_t srt2rtmp::init() { + srs_error_t err = srs_success; - _thread_ptr = std::make_shared(&srt2rtmp::on_work, this); + if (_trd_ptr.get() != nullptr) { + return srs_error_wrap(err, "don't start thread again"); + } - return; + _trd_ptr = std::make_shared("srt2rtmp", this); + + if ((err = _trd_ptr->start()) != srs_success) { + return srs_error_wrap(err, "start thread"); + } + srs_trace("srt2rtmp start coroutine..."); + + return err; } -void srt2rtmp::stop() { - _run_flag = false; - - _thread_ptr->join(); - return; +void srt2rtmp::release() { + if (!_trd_ptr) { + return; + } + _trd_ptr->stop(); + _trd_ptr = nullptr; } void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) { std::unique_lock locker(_mutex); - if (!_run_flag) { - return; - } + SRT_DATA_MSG_PTR msg_ptr = std::make_shared(data_p, len, key_path); _msg_queue.push(msg_ptr); - _notify_cond.notify_one(); + //_notify_cond.notify_one(); return; } @@ -44,23 +62,35 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { std::unique_lock locker(_mutex); SRT_DATA_MSG_PTR msg_ptr; - while (_msg_queue.empty() && _run_flag) { - _notify_cond.wait(locker); + if (_msg_queue.empty()) + { + return msg_ptr; } + //while (_msg_queue.empty()) { + // _notify_cond.wait(locker); + //} msg_ptr = _msg_queue.front(); _msg_queue.pop(); return msg_ptr; } -void srt2rtmp::on_work() { - while(_run_flag) { +//the cycle is running in srs coroutine +srs_error_t srt2rtmp::cycle() { + srs_error_t err = srs_success; + + while(true) { SRT_DATA_MSG_PTR msg_ptr = get_data_message(); if (!msg_ptr) { - continue; + srs_usleep((30 * SRS_UTIME_MILLISECONDS)); + } else { + handle_ts_data(msg_ptr); + } + + if ((err = _trd_ptr->pull()) != srs_success) { + return srs_error_wrap(err, "forwarder"); } - handle_ts_data(msg_ptr); } } @@ -80,13 +110,27 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { return; } -rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) { - _ts_ctx_ptr = std::make_shared(); +rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) + , _connect_flag(false) { + _ts_demux_ptr = std::make_shared(); _avc_ptr = std::make_shared(); _aac_ptr = std::make_shared(); + std::vector ret_vec; + string_split(key_path, "/", ret_vec); + + if (ret_vec.size() >= 3) { + _vhost = ret_vec[0]; + _appname = ret_vec[1]; + _streamname = ret_vec[2]; + } else { + _vhost = "DEFAULT_VHOST"; + _appname = ret_vec[0]; + _streamname = ret_vec[1]; + } char url_sz[128]; - sprintf(url_sz, "rtmp://127.0.0.1/%s", key_path.c_str()); + sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s", + _appname.c_str(), _vhost.c_str(), _streamname.c_str()); _url = url_sz; _h264_sps_changed = false; @@ -100,11 +144,13 @@ rtmp_client::~rtmp_client() { } void rtmp_client::close() { + _connect_flag = false; if (!_rtmp_conn_ptr) { return; } _rtmp_conn_ptr->close(); _rtmp_conn_ptr = nullptr; + } srs_error_t rtmp_client::connect() { @@ -112,6 +158,10 @@ srs_error_t rtmp_client::connect() { srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + if (_connect_flag) { + return srs_success; + } + if (_rtmp_conn_ptr.get() != nullptr) { return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.", _url.c_str(), srsu2msi(cto), srsu2msi(sto)); @@ -129,19 +179,12 @@ srs_error_t rtmp_client::connect() { close(); return srs_error_wrap(err, "publish error, url:%s", _url.c_str()); } - + _connect_flag = true; return err; } -void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { - SrsBuffer* buffer_p = new SrsBuffer((char*)data_ptr->get_data(), data_ptr->data_len()); - FILE* file_p = fopen("1.ts", "ab+"); - if (file_p) { - fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p); - fclose(file_p); - } - //srs_trace_data((char*)data_ptr->get_data(), data_ptr->data_len(), "receive ts data"); - _ts_ctx_ptr->decode(buffer_p, this);//on_ts_message is the decode callback +void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { + _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback return; } @@ -236,11 +279,6 @@ srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsR srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { srs_error_t err = srs_success; - - if ((err = connect()) != srs_success) { - return srs_error_wrap(err, "connect"); - } - SrsSharedPtrMessage* msg = NULL; if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) { @@ -257,26 +295,23 @@ srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* return err; } -srs_error_t rtmp_client::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) { +srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { srs_error_t err = srs_success; - + // ensure rtmp connected. if ((err = connect()) != srs_success) { return srs_error_wrap(err, "connect"); } - // ts tbn to flv tbn. - uint32_t dts = (uint32_t)(msg->dts / 90); - uint32_t pts = (uint32_t)(msg->dts / 90); - // send each frame. - while (!avs->empty()) { + while (!avs_ptr->empty()) { char* frame = NULL; int frame_size = 0; - if ((err = _avc_ptr->annexb_demux(avs, &frame, &frame_size)) != srs_success) { + if ((err = _avc_ptr->annexb_demux(avs_ptr.get(), &frame, &frame_size)) != srs_success) { return srs_error_wrap(err, "demux annexb"); } + //srs_trace_data(frame, frame_size, "video annexb demux:"); // 5bits, 7.3.1 NAL unit syntax, // ISO_IEC_14496-10-AVC-2003.pdf, page 44. // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame @@ -336,32 +371,31 @@ srs_error_t rtmp_client::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) { return err; } -srs_error_t rtmp_client::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) { +srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { srs_error_t err = srs_success; - + // ensure rtmp connected. if ((err = connect()) != srs_success) { return srs_error_wrap(err, "connect"); } - // ts tbn to flv tbn. - uint32_t dts = (uint32_t)(msg->dts / 90); - // send each frame. - while (!avs->empty()) { + while (!avs_ptr->empty()) { char* frame = NULL; int frame_size = 0; SrsRawAacStreamCodec codec; - if ((err = _aac_ptr->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) { + if ((err = _aac_ptr->adts_demux(avs_ptr.get(), &frame, &frame_size, codec)) != srs_success) { return srs_error_wrap(err, "demux adts"); } - + //srs_trace("audio annexb demux sampling_frequency_index:%d, aac_packet_type:%d, sound_rate:%d, sound_size:%d", + // codec.sampling_frequency_index, codec.aac_packet_type, codec.sound_rate, + // codec.sound_size); + //srs_trace_data(frame, frame_size, "audio annexb demux:"); // ignore invalid frame, // * atleast 1bytes for aac to decode the data. if (frame_size <= 0) { continue; } - srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts); // generate sh. if (_aac_specific_config.empty()) { @@ -388,40 +422,25 @@ srs_error_t rtmp_client::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) { return err; } -srs_error_t rtmp_client::on_ts_message(SrsTsMessage* msg) { - srs_error_t err = srs_success; +void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, + uint64_t dts, uint64_t pts) +{ + if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) { + assert(0); + return; + } - srs_trace("ts demux len:%d", msg->payload->length()); - // When the audio SID is private stream 1, we use common audio. - // @see https://github.com/ossrs/srs/issues/740 - if (msg->channel->apply == SrsTsPidApplyAudio && msg->sid == SrsTsPESStreamIdPrivateStream1) { - msg->sid = SrsTsPESStreamIdAudioCommon; + auto avs_ptr = std::make_shared((char*)data_ptr->get_data(), data_ptr->data_len()); + dts = dts / 90; + pts = pts / 90; + + if (media_type == STREAM_TYPE_VIDEO_H264) { + on_ts_video(avs_ptr, dts, pts); + } else if (media_type == STREAM_TYPE_AUDIO_AAC) { + on_ts_audio(avs_ptr, dts, pts); + } else { + srs_error("mpegts demux unkown stream type:0x%02x", media_type); + assert(0); } - - // when not audio/video, or not adts/annexb format, donot support. - if (msg->stream_number() != 0) { - return srs_error_new(ERROR_STREAM_CASTER_TS_ES, "ts: unsupported stream format, sid=%#x(%s-%d)", - msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number()); - } - - // check supported codec - if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) { - return srs_error_new(ERROR_STREAM_CASTER_TS_CODEC, "ts: unsupported stream codec=%d", msg->channel->stream); - } - - // parse the stream. - SrsBuffer avs(msg->payload->bytes(), msg->payload->length()); - - // publish audio or video. - if (msg->channel->stream == SrsTsStreamVideoH264) { - if ((err = on_ts_video(msg, &avs)) != srs_success) { - return srs_error_wrap(err, "ts: consume video"); - } - } - if (msg->channel->stream == SrsTsStreamAudioAAC) { - if ((err = on_ts_audio(msg, &avs)) != srs_success) { - return srs_error_wrap(err, "ts: consume audio"); - } - } - return err; -} \ No newline at end of file + return; +} diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index 55a9c2438..040b832b7 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -13,6 +13,7 @@ #include #include "srt_data.hpp" +#include "ts_demux.hpp" #define SRT_VIDEO_MSG_TYPE 0x01 #define SRT_AUDIO_MSG_TYPE 0x02 @@ -21,7 +22,9 @@ typedef std::shared_ptr RTMP_CONN_PTR; typedef std::shared_ptr AVC_PTR; typedef std::shared_ptr AAC_PTR; -class rtmp_client : public ISrsTsHandler { +#define DEFAULT_VHOST "__default_host__" + +class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_from_this { public: rtmp_client(std::string key_path); ~rtmp_client(); @@ -29,13 +32,14 @@ public: void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); private: - virtual srs_error_t on_ts_message(SrsTsMessage* msg); + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); + srs_error_t connect(); void close(); private: - srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); - srs_error_t on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs); + srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); + srs_error_t on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); @@ -46,7 +50,10 @@ private: private: std::string _key_path; std::string _url; - std::shared_ptr _ts_ctx_ptr; + std::string _vhost; + std::string _appname; + std::string _streamname; + TS_DEMUX_PTR _ts_demux_ptr; private: AVC_PTR _avc_ptr; @@ -60,34 +67,35 @@ private: AAC_PTR _aac_ptr; private: RTMP_CONN_PTR _rtmp_conn_ptr; + bool _connect_flag; }; typedef std::shared_ptr RTMP_CLIENT_PTR; -class srt2rtmp { +class srt2rtmp : public ISrsCoroutineHandler { public: + static std::shared_ptr get_instance(); srt2rtmp(); virtual ~srt2rtmp(); - void start(); - void stop(); + srs_error_t init(); + void release(); + void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); private: SRT_DATA_MSG_PTR get_data_message(); - void on_work(); + virtual srs_error_t cycle(); void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); private: - std::shared_ptr _thread_ptr; + static std::shared_ptr s_srt2rtmp_ptr; + std::shared_ptr _trd_ptr; std::mutex _mutex; - std::condition_variable_any _notify_cond; + //std::condition_variable_any _notify_cond; std::queue _msg_queue; std::unordered_map _rtmp_client_map; - bool _run_flag; }; -typedef std::shared_ptr SRT2RTMP_PTR; - #endif \ No newline at end of file diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index ba6dbb080..4c10f871b 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -1,7 +1,12 @@ #include "ts_demux.hpp" #include +#include -ts_demux::ts_demux() { +ts_demux::ts_demux():_data_total(0) + ,_last_pid(0) + ,_last_dts(0) + ,_last_pts(0) +{ } @@ -9,14 +14,11 @@ ts_demux::~ts_demux() { } -int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) { - - return 0; -} - -int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { +int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback) +{ int pos = 0; int npos = 0; + ts_header ts_header_info; ts_header_info._sync_byte = data_p[pos]; pos++; @@ -33,7 +35,16 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { pos++; npos = pos; + //printf("ts header(0x%02x) payload_unit_start_indicator:%d, pid:%d, adaptation_field_control:%d, pos:%d\r\n", + // ts_header_info._sync_byte, + // ts_header_info._payload_unit_start_indicator, ts_header_info._PID, + // ts_header_info._adaptation_field_control, pos); + adaptation_field* field_p = &(ts_header_info._adaptation_field_info); + // adaptation field + // 0x01 No adaptation_field, payload only + // 0x02 Adaptation_field only, no payload + // 0x03 Adaptation_field followed by payload if( ts_header_info._adaptation_field_control == 2 || ts_header_info._adaptation_field_control == 3 ){ // adaptation_field() @@ -107,10 +118,7 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { } } } - - npos += sizeof(field_p->_adaptation_field_length) + field_p->_adaptation_field_length; - assert(pos == npos); } if(ts_header_info._adaptation_field_control == 1 @@ -137,11 +145,11 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { _pat._section_number = data_p[pos]; pos++; _pat._last_section_number = data_p[pos]; - assert(table_id == 0x00); - assert(188-npos>section_length+3); // PAT = section_length + 3 + assert(_pat._table_id == 0x00); + assert((188 - npos) > (_pat._section_length+3)); // PAT = section_length + 3 pos++; _pat._pid_vec.clear(); - for (;pos+4 <= section_length-5-4+9;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte ) + for (;pos+4 <= _pat._section_length-5-4+9 + npos;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte ) PID_INFO pid_info; //program_number 16 uimsbf pid_info._program_number = data_p[pos]<<8|data_p[pos+1]; @@ -151,11 +159,13 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { if (pid_info._program_number == 0) { // // network_PID 13 uimsbf pid_info._network_id = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; + //printf("#### network id:%d.\r\n", pid_info._network_id); pos += 2; } else { // // program_map_PID 13 uimsbf pid_info._pid = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; + //printf("#### pmt id:%d.\r\n", pid_info._pid); pos += 2; } _pat._pid_vec.push_back(pid_info); @@ -174,16 +184,15 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { }else if(ts_header_info._PID == 0x11){ // SDT // https://en.wikipedia.org/wiki/Service_Description_Table / https://en.wikipedia.org/wiki/MPEG_transport_stream }else if(is_pmt(ts_header_info._PID)) { - int rpos = 0; if(ts_header_info._payload_unit_start_indicator) - rpos++; - _pmt._table_id = data_p[rpos]; + pos++; + _pmt._table_id = data_p[pos]; pos++; - _pmt._section_syntax_indicator = (data_p[rpos]>>7)&0x01; + _pmt._section_syntax_indicator = (data_p[pos]>>7)&0x01; // skip 3 bits of 1 zero and 2 reserved - _pmt._section_length = ((data_p[rpos]<<8)|data_p[pos+1])&0x0FFF; + _pmt._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; pos += 2; - _pmt._program_number = (data_p[rpos]<<8)|data_p[rpos+1]; + _pmt._program_number = (data_p[pos]<<8)|data_p[pos+1]; pos += 2; // reserved 2 bits _pmt._version_number = (data_p[pos]&0x3E)>>1; @@ -207,7 +216,9 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { // } pos += _pmt._program_info_length; _pmt._stream_pid_vec.clear(); - for (; rpos + 5 <= _pmt._section_number + 4 - 4; ) { // rpos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32) + _pmt._pid2steamtype.clear(); + + for (; pos + 5 <= _pmt._section_length + 4 - 4 + npos; ) { // pos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32) STREAM_PID_INFO pid_info; pid_info._stream_type = data_p[pos];//stream_type 8 uimsbf 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video pos++; @@ -217,12 +228,13 @@ int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info) { //reserved 4 bslbf pid_info._ES_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; //ES_info_length 12 uimsbf pos += 2; - if( rpos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 ) + if( pos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 + npos ) break; - int absES_info_length = rpos + pid_info._ES_info_length; - for (; rpos 0)) { + insert_into_databuf(ret_data_p, ret_size, key_path, ts_header_info._PID); + } }else{ - // rpos = pes_parse(p, rpos); - printf("PES payload_unit_start_indicator is zero; rpos=%d npos=%d remain=%d hex=%s\n", rpos, npos, 188-(npos+rpos), hexdump(p, 8)); - fwrite(p, 1, 188-(npos+rpos), pes_info[i].fd); + //fwrite(p, 1, 188-(npos+pos), pes_info[i].fd); + insert_into_databuf(data_p + npos, 188-npos, key_path, ts_header_info._PID); } } } - if(!isFound){ - printf("unknown PID = %X \n", PID); - } + //if(!isFound){ + // printf("unknown PID = %X \n", ts_header_info._PID); + //} } } return 0; } +int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) +{ + int ret = -1; + std::string path; + + if (!data_ptr || (data_ptr->data_len() < 188) || (data_ptr->data_len()%188 != 0)) + { + return -1; + } + + + unsigned int count = data_ptr->data_len()/188; + path = data_ptr->get_path(); + for (unsigned int index = 0; index < count; index++) + { + ret = decode_unit(data_ptr->get_data() + 188*index, path, callback); + if (ret < 0) + { + break; + } + } + return ret; +} + +void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid) { + _last_pid = pid; + _data_total += data_size; + _data_buffer_vec.push_back(std::make_shared(data_p, data_size, key_path)); + return; +} + +void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, + uint64_t dts, uint64_t pts) { + if ((_data_total <=0 ) || (_data_buffer_vec.empty())) { + return; + } + + auto iter = _pmt._pid2steamtype.find(pid); + if (iter == _pmt._pid2steamtype.end()) { + return; + } + unsigned char stream_type = iter->second; + auto total_data_ptr = std::make_shared(_data_total, key_path); + size_t pos = 0; + + for (size_t index = 0; index < _data_buffer_vec.size(); index++) { + memcpy(total_data_ptr->get_data() + pos, + _data_buffer_vec[index]->get_data(), + _data_buffer_vec[index]->data_len()); + pos += _data_buffer_vec[index]->data_len(); + } + _data_buffer_vec.clear(); + _data_total = 0; + + callback->on_data_callback(total_data_ptr, stream_type, dts, pts); + return; +} bool ts_demux::is_pmt(unsigned short pid) { - for (int index = 0; index < _pat._pid_vec.size(); index++) { + for (size_t index = 0; index < _pat._pid_vec.size(); index++) { if (_pat._pid_vec[index]._program_number != 0) { if (_pat._pid_vec[index]._pid == pid) { return true; @@ -272,3 +349,229 @@ bool ts_demux::is_pmt(unsigned short pid) { } return false; } + + +int ts_demux::pes_parse(unsigned char* p, size_t npos, + unsigned char** ret_pp, size_t& ret_size, + uint64_t& dts, uint64_t& pts) { + int pos = 0; + int packet_start_code_prefix = (p[pos]<<16)|(p[pos+1]<<8)|p[pos+2]; //packet_start_code_prefix 24 bslbf + pos += 3; + int stream_id = p[pos]; //stream_id 8 uimsbf + pos++; + //printf("pes parse %02x %02x.\r\n", p[pos], p[pos+1]); + int PES_packet_length = ((unsigned int)p[pos]<<8)|p[pos+1]; //PES_packet_length 16 uimsbf + (void)PES_packet_length; + pos += 2; + //printf("pes parse packet_start_code_prefix:%d, npos:%lu, PES_packet_length:%d, stream_id:%d.\r\n", + // packet_start_code_prefix, npos, PES_packet_length, stream_id); + assert(0x00000001 == packet_start_code_prefix); + if (stream_id != 188//program_stream_map 1011 1100 + && stream_id != 190//padding_stream 1011 1110 + && stream_id != 191//private_stream_2 1011 1111 + && stream_id != 240//ECM 1111 0000 + && stream_id != 241//EMM 1111 0001 + && stream_id != 255//program_stream_directory 1111 1111 + && stream_id != 242//DSMCC_stream 1111 0010 + && stream_id != 248//ITU-T Rec. H.222.1 type E stream 1111 1000 + ) + { + assert(0x80 == p[pos]); + //skip 2bits//'10' 2 bslbf + int PES_scrambling_control = (p[pos]&30)>>4; //PES_scrambling_control 2 bslbf + (void)PES_scrambling_control; + int PES_priority = (p[pos]&0x08)>>3; //PES_priority 1 bslbf + (void)PES_priority; + int data_alignment_indicator = (p[pos]&0x04)>>2;//data_alignment_indicator 1 bslbf + (void)data_alignment_indicator; + int copyright = (p[pos]&0x02)>>1; //copyright 1 bslbf + (void)copyright; + int original_or_copy = (p[pos]&0x01);//original_or_copy 1 bslbf + (void)original_or_copy; + pos++; + int PTS_DTS_flags = (p[pos]&0xC0)>>6; //PTS_DTS_flags 2 bslbf + int ESCR_flag = (p[pos]&0x20)>>5; // ESCR_flag 1 bslbf + int ES_rate_flag = (p[pos]&0x10)>>4;//ES_rate_flag 1 bslbf + int DSM_trick_mode_flag = (p[pos]&0x08)>>3;//DSM_trick_mode_flag 1 bslbf + int additional_copy_info_flag = (p[pos]&0x04)>>2; //additional_copy_info_flag 1 bslbf + int PES_CRC_flag = (p[pos]&0x02)>>1; //PES_CRC_flag 1 bslbf + int PES_extension_flag = (p[pos]&0x01);//PES_extension_flag 1 bslbf + pos++; + int PES_header_data_length = p[pos]; //PES_header_data_length 8 uimsbf + (void)PES_header_data_length; + pos++; + + if (PTS_DTS_flags == 2) { + // skip 4 bits '0010' 4 bslbf + // PTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // PTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // PTS [14..0] 15 bslbf + // marker_bit 1 bslbf + pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); + pos += 5; + } + if (PTS_DTS_flags == 3) { + // '0011' 4 bslbf + // PTS [32..30] 3 bslbf + // marker_bit 1 bslbf + //PTS [29..15] 15 bslbf + //marker_bit 1 bslbf + // PTS [14..0] 15 bslbf + // marker_bit 1 bslbf + pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); + pos += 5; + // '0001' 4 bslbf + // DTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // DTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // DTS [14..0] 15 bslbf + // marker_bit 1 bslbf + dts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); + pos += 5; + } + if (ESCR_flag == 1) { + // reserved 2 bslbf + // ESCR_base[32..30] 3 bslbf + // marker_bit 1 bslbf + // ESCR_base[29..15] 15 bslbf + // marker_bit 1 bslbf + // ESCR_base[14..0] 15 bslbf + // marker_bit 1 bslbf + // ESCR_extension 9 uimsbf + // marker_bit 1 bslbf + uint64_t ESCR_base = ((((uint64_t)p[pos] >> 3) & 0x07) << 30) | (((uint64_t)p[pos] & 0x03) << 28) | ((uint64_t)p[pos + 1] << 20) | ((((uint64_t)p[pos + 2] >> 3) & 0x1F) << 15) | (((uint64_t)p[pos + 2] & 0x3) << 13) | ((uint64_t)p[pos + 3] << 5) | ((p[pos + 4] >> 3) & 0x1F); + int ESCR_extension = ((p[pos + 4] & 0x03) << 7) | ((p[pos + 5] >> 1) & 0x7F); + (void)ESCR_base; + (void)ESCR_extension; + pos += 6; + } + if (ES_rate_flag == 1) { + // marker_bit 1 bslbf + // ES_rate 22 uimsbf + // marker_bit 1 bslbf + int ES_rate = (p[pos]&0x7F)<<15 | (p[pos+1])<<7 | (p[pos+2]&0x7F)>>1; + (void)ES_rate; + pos += 3; + } + if (DSM_trick_mode_flag == 1) { // ignore + int trick_mode_control = (p[pos]&0xE0)>>5;//trick_mode_control 3 uimsbf + if ( trick_mode_control == 0/*fast_forward*/ ) { + // field_id 2 bslbf + // intra_slice_refresh 1 bslbf + // frequency_truncation 2 bslbf + } + else if ( trick_mode_control == 1/*slow_motion*/ ) { + //rep_cntrl 5 uimsbf + } + else if ( trick_mode_control == 2/*freeze_frame*/ ) { + // field_id 2 uimsbf + // reserved 3 bslbf + } + else if ( trick_mode_control == 3/*fast_reverse*/ ) { + // field_id 2 bslbf + // intra_slice_refresh 1 bslbf + // frequency_truncation 2 bslbf + }else if ( trick_mode_control == 4/*slow_reverse*/ ) { + // rep_cntrl 5 uimsbf + } + else{ + //reserved 5 bslbf + } + pos++; + } + if ( additional_copy_info_flag == 1) { // ignore + // marker_bit 1 bslbf + // additional_copy_info 7 bslbf + pos++; + } + if ( PES_CRC_flag == 1) { // ignore + // previous_PES_packet_CRC 16 bslbf + pos += 2; + } + if ( PES_extension_flag == 1) { // ignore + int PES_private_data_flag = (p[pos]&0x80)>>7;// PES_private_data_flag 1 bslbf + int pack_header_field_flag = (p[pos]&0x40)>>6;// pack_header_field_flag 1 bslbf + int program_packet_sequence_counter_flag = (p[pos]&0x20)>>5;// program_packet_sequence_counter_flag 1 bslbf + int P_STD_buffer_flag = (p[pos]&0x10)>>4; // P-STD_buffer_flag 1 bslbf + // reserved 3 bslbf + int PES_extension_flag_2 = (p[pos]&0x01);// PES_extension_flag_2 1 bslbf + pos++; + + if ( PES_private_data_flag == 1) { + // PES_private_data 128 bslbf + pos += 16; + } + if (pack_header_field_flag == 1) { + // pack_field_length 8 uimsbf + // pack_header() + } + if (program_packet_sequence_counter_flag == 1) { + // marker_bit 1 bslbf + // program_packet_sequence_counter 7 uimsbf + // marker_bit 1 bslbf + // MPEG1_MPEG2_identifier 1 bslbf + // original_stuff_length 6 uimsbf + pos += 2; + } + if ( P_STD_buffer_flag == 1) { + // '01' 2 bslbf + // P-STD_buffer_scale 1 bslbf + // P-STD_buffer_size 13 uimsbf + pos += 2; + } + if ( PES_extension_flag_2 == 1) { + // marker_bit 1 bslbf + int PES_extension_field_length = (p[pos]&0x7F);// PES_extension_field_length 7 uimsbf + pos++; + for (int i = 0; i < PES_extension_field_length; i++) { + // reserved 8 bslbf + pos++; + } + } + } + +// for (int i = 0; i < N1; i++) { + //stuffing_byte 8 bslbf +// rpos++; +// } +// for (int i = 0; i < N2; i++) { + //PES_packet_data_byte 8 bslbf +// rpos++; +// } + *ret_pp = p+pos; + ret_size = 188-(npos+pos); + //printf("pes parse body size:%lu, data:0x%02x 0x%02x 0x%02x 0x%02x 0x%02x 0x%02x, dts:%lu(%lu), pts:%lu(%lu)\r\n", + // ret_size, p[pos], p[pos+1], p[pos+2], p[pos+3], p[pos+4], p[pos+5], + // dts, dts/90, pts, pts/90); + } + else if ( stream_id == 188//program_stream_map 1011 1100 BC + || stream_id == 191//private_stream_2 1011 1111 BF + || stream_id == 240//ECM 1111 0000 F0 + || stream_id == 241//EMM 1111 0001 F1 + || stream_id == 255//program_stream_directory 1111 1111 FF + || stream_id == 242//DSMCC_stream 1111 0010 F2 + || stream_id == 248//ITU-T Rec. H.222.1 type E stream 1111 1000 F8 + ) { +// for (i = 0; i < PES_packet_length; i++) { + //PES_packet_data_byte 8 bslbf +// rpos++; +// } + *ret_pp = p+pos; + ret_size = 188-(npos+pos); + //fwrite(p, 1, 188-(npos+rpos), fd); + } + else if ( stream_id == 190//padding_stream 1011 1110 + ) { +// for (i = 0; i < PES_packet_length; i++) { + // padding_byte 8 bslbf +// rpos++; + *ret_pp = p+pos; + ret_size = 188-(npos+pos); +// } + } + + return pos; +} \ No newline at end of file diff --git a/trunk/src/srt/ts_demux.hpp b/trunk/src/srt/ts_demux.hpp index 4a20f1177..4d7c9fd5e 100644 --- a/trunk/src/srt/ts_demux.hpp +++ b/trunk/src/srt/ts_demux.hpp @@ -4,10 +4,68 @@ #include #include #include +#include + +/* mpegts stream type in ts pmt +Value Description +0x00 ITU-T | ISO/IEC Reserved +0x01 ISO/IEC 11172-2 Video (mpeg video v1) +0x02 ITU-T Rec. H.262 | ISO/IEC 13818-2 Video(mpeg video v2)or ISO/IEC 11172-2 constrained parameter video stream +0x03 ISO/IEC 11172-3 Audio (MPEG 1 Audio codec Layer I, Layer II and Layer III audio specifications) +0x04 ISO/IEC 13818-3 Audio (BC Audio Codec) +0x05 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 private_sections +0x06 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 PES packets containing private data +0x07 ISO/IEC 13522 MHEG +0x08 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Annex A DSM-CC +0x09 ITU-T Rec. H.222.1 +0x0A ISO/IEC 13818-6 type A +0x0B ISO/IEC 13818-6 type B +0x0C ISO/IEC 13818-6 type C +0x0D ISO/IEC 13818-6 type D +0x0E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 auxiliary +0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax +0x10 ISO/IEC 14496-2 Visual +0x11 ISO/IEC 14496-3 Audio with the LATM transport syntax as defined in ISO/IEC 14496-3/Amd.1 +0x12 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in PES packets +0x13 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in ISO/IEC 14496_sections +0x14 ISO/IEC 13818-6 Synchronized Download Protocol +0x15 Metadata carried in PES packets +0x16 Metadata carried in metadata_sections +0x17 Metadata carried in ISO/IEC 13818-6 Data Carousel +0x18 Metadata carried in ISO/IEC 13818-6 Object Carousel +0x19 Metadata carried in ISO/IEC 13818-6 Synchronized Download Protocol +0x1A IPMP stream (defined in ISO/IEC 13818-11, MPEG-2 IPMP) +0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video (h.264) +0x1C ISO/IEC 14496-3 Audio, without using any additional transport syntax, such as DST, ALS and SLS +0x1D ISO/IEC 14496-17 Text +0x1E Auxiliary video stream as defined in ISO/IEC 23002-3 (AVS) +0x1F-0x7E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Reserved +0x7F IPMP stream 0x80-0xFF User Private +*/ +#define STREAM_TYPE_VIDEO_MPEG1 0x01 +#define STREAM_TYPE_VIDEO_MPEG2 0x02 +#define STREAM_TYPE_AUDIO_MPEG1 0x03 +#define STREAM_TYPE_AUDIO_MPEG2 0x04 +#define STREAM_TYPE_PRIVATE_SECTION 0x05 +#define STREAM_TYPE_PRIVATE_DATA 0x06 +#define STREAM_TYPE_AUDIO_AAC 0x0f +#define STREAM_TYPE_AUDIO_AAC_LATM 0x11 +#define STREAM_TYPE_VIDEO_MPEG4 0x10 +#define STREAM_TYPE_METADATA 0x15 +#define STREAM_TYPE_VIDEO_H264 0x1b +#define STREAM_TYPE_VIDEO_HEVC 0x24 +#define STREAM_TYPE_VIDEO_CAVS 0x42 +#define STREAM_TYPE_VIDEO_VC1 0xea +#define STREAM_TYPE_VIDEO_DIRAC 0xd1 + +#define STREAM_TYPE_AUDIO_AC3 0x81 +#define STREAM_TYPE_AUDIO_DTS 0x82 +#define STREAM_TYPE_AUDIO_TRUEHD 0x83 +#define STREAM_TYPE_AUDIO_EAC3 0x87 class ts_media_data_callback_I { public: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr); + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; }; typedef std::shared_ptr TS_DATA_CALLBACK_PTR; @@ -88,8 +146,8 @@ typedef struct { class pat_info { public: - pat_info(); - ~pat_info(); + pat_info(){}; + ~pat_info(){}; public: unsigned char _table_id; @@ -99,7 +157,7 @@ public: unsigned short _reserved1:2; unsigned short _section_length:12; - unsigned short transport_stream_id; + unsigned short _transport_stream_id; unsigned char _reserved3:2; unsigned char _version_number:5; @@ -124,7 +182,7 @@ class pmt_info { public: pmt_info(){}; ~pmt_info(){}; -private: +public: unsigned char _table_id; unsigned short _section_syntax_indicator:1; unsigned short _reserved1:1; @@ -142,6 +200,7 @@ private: unsigned short _program_info_length:12; unsigned char _dscr[4096]; + std::unordered_map _pid2steamtype; std::vector _stream_pid_vec; }; @@ -153,14 +212,26 @@ public: int decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback); private: - int decode_ts_header(unsigned char* data_p, ts_header& ts_header_info); + int decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback); bool is_pmt(unsigned short pmt_id); + int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size, + uint64_t& dts, uint64_t& pts); + void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid); + void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, + std::string key_path, uint64_t dts, uint64_t pts); private: std::string _key_path;//only for srt pat_info _pat; pmt_info _pmt; + std::vector _data_buffer_vec; + size_t _data_total; + unsigned short _last_pid; + uint64_t _last_dts; + uint64_t _last_pts; }; +typedef std::shared_ptr TS_DEMUX_PTR; + #endif \ No newline at end of file diff --git a/trunk/src/srt/ts_demux_test.cpp b/trunk/src/srt/ts_demux_test.cpp new file mode 100644 index 000000000..e68d06218 --- /dev/null +++ b/trunk/src/srt/ts_demux_test.cpp @@ -0,0 +1,55 @@ +#include "ts_demux.hpp" +#include +#include + +#define TS_MAX 188 + +class media_data_get : public ts_media_data_callback_I { +public: + media_data_get() {}; + virtual ~media_data_get() {}; + +public: + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type + , uint64_t dts, uint64_t pts) { + printf("media type:%d, data len:%d, key_path:%s, dts:%lu(%lu), pts:%lu(%lu)\r\n", + media_type, data_ptr->data_len(), data_ptr->get_path().c_str(), dts, dts/90, pts, pts/90); + FILE* file_p; + char filename[80]; + + sprintf(filename, "%u.media", media_type); + file_p = fopen(filename, "ab+"); + if (file_p) { + fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p); + fclose(file_p); + } + return; + } +}; + +int main(int argn, char** argv) { + unsigned char data[TS_MAX]; + ts_demux demux_obj; + auto callback_ptr = std::make_shared(); + FILE* file_p; + if (argn < 2) { + printf("please input ts name.\r\n"); + return 0; + } + + const char* file_name = argv[1]; + printf("input ts name:%s.\r\n", file_name); + + file_p = fopen(file_name, "r"); + fseek(file_p, 0L, SEEK_END); /* 定位到文件末尾 */ + size_t flen = ftell(file_p); /* 得到文件大小 */ + fseek(file_p, 0L, SEEK_SET); /* 定位到文件开头 */ + + do { + fread(data, TS_MAX, 1, file_p); + auto input_ptr = std::make_shared((unsigned char*)data, (unsigned int)TS_MAX, std::string("live/shiwei")); + demux_obj.decode(input_ptr, callback_ptr); + flen -= TS_MAX; + } while(flen > 0); + return 1; +} \ No newline at end of file