1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

add srt mpegts to rtmp

This commit is contained in:
runner365 2020-01-21 19:20:09 +08:00
parent 7b9e3ecdc4
commit 5c5fd805c5
17 changed files with 682 additions and 172 deletions

View file

@ -5,38 +5,56 @@
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_stream.hpp>
#include "stringex.hpp"
srt2rtmp::srt2rtmp():_run_flag(false) {
std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
if (!s_srt2rtmp_ptr) {
s_srt2rtmp_ptr = std::make_shared<srt2rtmp>();
}
return s_srt2rtmp_ptr;
}
srt2rtmp::srt2rtmp() {
}
srt2rtmp::~srt2rtmp() {
release();
}
void srt2rtmp::start() {
_run_flag = true;
srs_error_t srt2rtmp::init() {
srs_error_t err = srs_success;
_thread_ptr = std::make_shared<std::thread>(&srt2rtmp::on_work, this);
if (_trd_ptr.get() != nullptr) {
return srs_error_wrap(err, "don't start thread again");
}
return;
_trd_ptr = std::make_shared<SrsSTCoroutine>("srt2rtmp", this);
if ((err = _trd_ptr->start()) != srs_success) {
return srs_error_wrap(err, "start thread");
}
srs_trace("srt2rtmp start coroutine...");
return err;
}
void srt2rtmp::stop() {
_run_flag = false;
_thread_ptr->join();
return;
void srt2rtmp::release() {
if (!_trd_ptr) {
return;
}
_trd_ptr->stop();
_trd_ptr = nullptr;
}
void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) {
std::unique_lock<std::mutex> locker(_mutex);
if (!_run_flag) {
return;
}
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(data_p, len, key_path);
_msg_queue.push(msg_ptr);
_notify_cond.notify_one();
//_notify_cond.notify_one();
return;
}
@ -44,23 +62,35 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr;
while (_msg_queue.empty() && _run_flag) {
_notify_cond.wait(locker);
if (_msg_queue.empty())
{
return msg_ptr;
}
//while (_msg_queue.empty()) {
// _notify_cond.wait(locker);
//}
msg_ptr = _msg_queue.front();
_msg_queue.pop();
return msg_ptr;
}
void srt2rtmp::on_work() {
while(_run_flag) {
//the cycle is running in srs coroutine
srs_error_t srt2rtmp::cycle() {
srs_error_t err = srs_success;
while(true) {
SRT_DATA_MSG_PTR msg_ptr = get_data_message();
if (!msg_ptr) {
continue;
srs_usleep((30 * SRS_UTIME_MILLISECONDS));
} else {
handle_ts_data(msg_ptr);
}
if ((err = _trd_ptr->pull()) != srs_success) {
return srs_error_wrap(err, "forwarder");
}
handle_ts_data(msg_ptr);
}
}
@ -80,13 +110,27 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
return;
}
rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) {
_ts_ctx_ptr = std::make_shared<SrsTsContext>();
rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
, _connect_flag(false) {
_ts_demux_ptr = std::make_shared<ts_demux>();
_avc_ptr = std::make_shared<SrsRawH264Stream>();
_aac_ptr = std::make_shared<SrsRawAacStream>();
std::vector<std::string> ret_vec;
string_split(key_path, "/", ret_vec);
if (ret_vec.size() >= 3) {
_vhost = ret_vec[0];
_appname = ret_vec[1];
_streamname = ret_vec[2];
} else {
_vhost = "DEFAULT_VHOST";
_appname = ret_vec[0];
_streamname = ret_vec[1];
}
char url_sz[128];
sprintf(url_sz, "rtmp://127.0.0.1/%s", key_path.c_str());
sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s",
_appname.c_str(), _vhost.c_str(), _streamname.c_str());
_url = url_sz;
_h264_sps_changed = false;
@ -100,11 +144,13 @@ rtmp_client::~rtmp_client() {
}
void rtmp_client::close() {
_connect_flag = false;
if (!_rtmp_conn_ptr) {
return;
}
_rtmp_conn_ptr->close();
_rtmp_conn_ptr = nullptr;
}
srs_error_t rtmp_client::connect() {
@ -112,6 +158,10 @@ srs_error_t rtmp_client::connect() {
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
if (_connect_flag) {
return srs_success;
}
if (_rtmp_conn_ptr.get() != nullptr) {
return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.",
_url.c_str(), srsu2msi(cto), srsu2msi(sto));
@ -129,19 +179,12 @@ srs_error_t rtmp_client::connect() {
close();
return srs_error_wrap(err, "publish error, url:%s", _url.c_str());
}
_connect_flag = true;
return err;
}
void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
SrsBuffer* buffer_p = new SrsBuffer((char*)data_ptr->get_data(), data_ptr->data_len());
FILE* file_p = fopen("1.ts", "ab+");
if (file_p) {
fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p);
fclose(file_p);
}
//srs_trace_data((char*)data_ptr->get_data(), data_ptr->data_len(), "receive ts data");
_ts_ctx_ptr->decode(buffer_p, this);//on_ts_message is the decode callback
void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
_ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
return;
}
@ -236,11 +279,6 @@ srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsR
srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) {
srs_error_t err = srs_success;
if ((err = connect()) != srs_success) {
return srs_error_wrap(err, "connect");
}
SrsSharedPtrMessage* msg = NULL;
if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) {
@ -257,26 +295,23 @@ srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char*
return err;
}
srs_error_t rtmp_client::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) {
srs_error_t rtmp_client::on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts) {
srs_error_t err = srs_success;
// ensure rtmp connected.
if ((err = connect()) != srs_success) {
return srs_error_wrap(err, "connect");
}
// ts tbn to flv tbn.
uint32_t dts = (uint32_t)(msg->dts / 90);
uint32_t pts = (uint32_t)(msg->dts / 90);
// send each frame.
while (!avs->empty()) {
while (!avs_ptr->empty()) {
char* frame = NULL;
int frame_size = 0;
if ((err = _avc_ptr->annexb_demux(avs, &frame, &frame_size)) != srs_success) {
if ((err = _avc_ptr->annexb_demux(avs_ptr.get(), &frame, &frame_size)) != srs_success) {
return srs_error_wrap(err, "demux annexb");
}
//srs_trace_data(frame, frame_size, "video annexb demux:");
// 5bits, 7.3.1 NAL unit syntax,
// ISO_IEC_14496-10-AVC-2003.pdf, page 44.
// 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
@ -336,32 +371,31 @@ srs_error_t rtmp_client::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) {
return err;
}
srs_error_t rtmp_client::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) {
srs_error_t rtmp_client::on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts) {
srs_error_t err = srs_success;
// ensure rtmp connected.
if ((err = connect()) != srs_success) {
return srs_error_wrap(err, "connect");
}
// ts tbn to flv tbn.
uint32_t dts = (uint32_t)(msg->dts / 90);
// send each frame.
while (!avs->empty()) {
while (!avs_ptr->empty()) {
char* frame = NULL;
int frame_size = 0;
SrsRawAacStreamCodec codec;
if ((err = _aac_ptr->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) {
if ((err = _aac_ptr->adts_demux(avs_ptr.get(), &frame, &frame_size, codec)) != srs_success) {
return srs_error_wrap(err, "demux adts");
}
//srs_trace("audio annexb demux sampling_frequency_index:%d, aac_packet_type:%d, sound_rate:%d, sound_size:%d",
// codec.sampling_frequency_index, codec.aac_packet_type, codec.sound_rate,
// codec.sound_size);
//srs_trace_data(frame, frame_size, "audio annexb demux:");
// ignore invalid frame,
// * atleast 1bytes for aac to decode the data.
if (frame_size <= 0) {
continue;
}
srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts);
// generate sh.
if (_aac_specific_config.empty()) {
@ -388,40 +422,25 @@ srs_error_t rtmp_client::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) {
return err;
}
srs_error_t rtmp_client::on_ts_message(SrsTsMessage* msg) {
srs_error_t err = srs_success;
void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
uint64_t dts, uint64_t pts)
{
if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
assert(0);
return;
}
srs_trace("ts demux len:%d", msg->payload->length());
// When the audio SID is private stream 1, we use common audio.
// @see https://github.com/ossrs/srs/issues/740
if (msg->channel->apply == SrsTsPidApplyAudio && msg->sid == SrsTsPESStreamIdPrivateStream1) {
msg->sid = SrsTsPESStreamIdAudioCommon;
auto avs_ptr = std::make_shared<SrsBuffer>((char*)data_ptr->get_data(), data_ptr->data_len());
dts = dts / 90;
pts = pts / 90;
if (media_type == STREAM_TYPE_VIDEO_H264) {
on_ts_video(avs_ptr, dts, pts);
} else if (media_type == STREAM_TYPE_AUDIO_AAC) {
on_ts_audio(avs_ptr, dts, pts);
} else {
srs_error("mpegts demux unkown stream type:0x%02x", media_type);
assert(0);
}
// when not audio/video, or not adts/annexb format, donot support.
if (msg->stream_number() != 0) {
return srs_error_new(ERROR_STREAM_CASTER_TS_ES, "ts: unsupported stream format, sid=%#x(%s-%d)",
msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number());
}
// check supported codec
if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) {
return srs_error_new(ERROR_STREAM_CASTER_TS_CODEC, "ts: unsupported stream codec=%d", msg->channel->stream);
}
// parse the stream.
SrsBuffer avs(msg->payload->bytes(), msg->payload->length());
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
if ((err = on_ts_video(msg, &avs)) != srs_success) {
return srs_error_wrap(err, "ts: consume video");
}
}
if (msg->channel->stream == SrsTsStreamAudioAAC) {
if ((err = on_ts_audio(msg, &avs)) != srs_success) {
return srs_error_wrap(err, "ts: consume audio");
}
}
return err;
}
return;
}