1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, APP support complex error.

This commit is contained in:
winlin 2018-01-01 22:32:54 +08:00
parent e2c1f58674
commit db08f1586c
14 changed files with 623 additions and 591 deletions

View file

@ -156,18 +156,14 @@ private:
std::vector<SrsTsPiece*> pieces;
int64_t next_connect_time;
private:
SrsBuffer* stream;
SrsTsContext* context;
public:
SrsIngestHlsInput(SrsHttpUri* hls) {
in_hls = hls;
next_connect_time = 0;
stream = new SrsBuffer();
context = new SrsTsContext();
}
virtual ~SrsIngestHlsInput() {
srs_freep(stream);
srs_freep(context);
std::vector<SrsTsPiece*>::iterator it;
@ -291,9 +287,8 @@ int SrsIngestHlsInput::parseTs(ISrsTsHandler* handler, char* body, int nb_body)
int nb_packet = (int)nb_body / SRS_TS_PACKET_SIZE;
for (int i = 0; i < nb_packet; i++) {
char* p = (char*)body + (i * SRS_TS_PACKET_SIZE);
if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
return ret;
}
SrsBuffer* stream = new SrsBuffer(p, SRS_TS_PACKET_SIZE);
SrsAutoFree(SrsBuffer, stream);
// process each ts packet
if ((err = context->decode(stream, handler)) != srs_success) {
@ -314,9 +309,8 @@ int SrsIngestHlsInput::parseAac(ISrsAacHandler* handler, char* body, int nb_body
{
int ret = ERROR_SUCCESS;
if ((ret = stream->initialize(body, nb_body)) != ERROR_SUCCESS) {
return ret;
}
SrsBuffer* stream = new SrsBuffer(body, nb_body);
SrsAutoFree(SrsBuffer, stream);
// atleast 2bytes.
if (!stream->require(3)) {
@ -396,7 +390,10 @@ int SrsIngestHlsInput::parseM3u8(SrsHttpUri* url, double& td, double& duration)
SrsAutoFree(ISrsHttpMessage, msg);
std::string body;
if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) {
if ((err = msg->body_read_all(body)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("read m3u8 failed. ret=%d", ret);
return ret;
}
@ -465,7 +462,10 @@ int SrsIngestHlsInput::parseM3u8(SrsHttpUri* url, double& td, double& duration)
}
srs_trace("parse sub m3u8, url=%s", m3u8_url.c_str());
if ((ret = url->initialize(m3u8_url)) != ERROR_SUCCESS) {
if ((err = url->initialize(m3u8_url)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -600,7 +600,10 @@ int SrsIngestHlsInput::SrsTsPiece::fetch(string m3u8)
}
SrsHttpUri uri;
if ((ret = uri.initialize(ts_url)) != ERROR_SUCCESS) {
if ((err = uri.initialize(ts_url)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -621,7 +624,10 @@ int SrsIngestHlsInput::SrsTsPiece::fetch(string m3u8)
srs_assert(msg);
SrsAutoFree(ISrsHttpMessage, msg);
if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) {
if ((err = msg->body_read_all(body)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("read ts failed. ret=%d", ret);
return ret;
}
@ -792,21 +798,16 @@ srs_error_t SrsIngestHlsOutput::on_ts_message(SrsTsMessage* msg)
int SrsIngestHlsOutput::on_aac_frame(char* frame, int frame_size, double duration)
{
int ret = ERROR_SUCCESS;
srs_trace("handle aac frames, size=%dB, duration=%.2f, dts=%" PRId64, frame_size, duration, raw_aac_dts);
SrsBuffer stream;
if ((ret = stream.initialize(frame, frame_size)) != ERROR_SUCCESS) {
return ret;
}
SrsBuffer stream(frame, frame_size);
return do_on_aac_frame(&stream, duration);
}
int SrsIngestHlsOutput::do_on_aac_frame(SrsBuffer* avs, double duration)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
uint32_t duration_ms = (uint32_t)(duration * 1000);
@ -822,7 +823,10 @@ int SrsIngestHlsOutput::do_on_aac_frame(SrsBuffer* avs, double duration)
char* frame = NULL;
int frame_size = 0;
SrsRawAacStreamCodec codec;
if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) {
if ((err = aac->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -836,7 +840,10 @@ int SrsIngestHlsOutput::do_on_aac_frame(SrsBuffer* avs, double duration)
// generate sh.
if (aac_specific_config.empty()) {
std::string sh;
if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) {
if ((err = aac->mux_sequence_header(&codec, sh)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
aac_specific_config = sh;
@ -905,11 +912,7 @@ int SrsIngestHlsOutput::parse_message_queue()
queue.erase(it);
// parse the stream.
SrsBuffer avs;
if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
srs_error("mpegts: initialize av stream failed. ret=%d", ret);
return ret;
}
SrsBuffer avs(msg->payload->bytes(), msg->payload->length());
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
@ -939,11 +942,7 @@ int SrsIngestHlsOutput::flush_message_queue()
queue.erase(it);
// parse the stream.
SrsBuffer avs;
if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
srs_error("mpegts: initialize av stream failed. ret=%d", ret);
return ret;
}
SrsBuffer avs(msg->payload->bytes(), msg->payload->length());
// publish audio or video.
if (msg->channel->stream == SrsTsStreamVideoH264) {
@ -964,6 +963,7 @@ int SrsIngestHlsOutput::flush_message_queue()
int SrsIngestHlsOutput::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// ts tbn to flv tbn.
uint32_t dts = (uint32_t)(msg->dts / 90);
@ -976,7 +976,10 @@ int SrsIngestHlsOutput::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
while (!avs->empty()) {
char* frame = NULL;
int frame_size = 0;
if ((ret = avc->annexb_demux(avs, &frame, &frame_size)) != ERROR_SUCCESS) {
if ((err = avc->annexb_demux(avs, &frame, &frame_size)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -998,7 +1001,10 @@ int SrsIngestHlsOutput::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
// for sps
if (avc->is_sps(frame, frame_size)) {
std::string sps;
if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) {
if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1013,7 +1019,10 @@ int SrsIngestHlsOutput::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
// for pps
if (avc->is_pps(frame, frame_size)) {
std::string pps;
if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) {
if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1027,7 +1036,10 @@ int SrsIngestHlsOutput::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
// ibp frame.
std::string ibp;
if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) {
if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
ibps.append(ibp);
@ -1051,6 +1063,7 @@ int SrsIngestHlsOutput::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
int SrsIngestHlsOutput::write_h264_sps_pps(uint32_t dts, uint32_t pts)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// when sps or pps changed, update the sequence header,
// for the pps maybe not changed while sps changed.
@ -1066,7 +1079,10 @@ int SrsIngestHlsOutput::write_h264_sps_pps(uint32_t dts, uint32_t pts)
// h264 raw to h264 packet.
std::string sh;
if ((ret = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != ERROR_SUCCESS) {
if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1075,7 +1091,10 @@ int SrsIngestHlsOutput::write_h264_sps_pps(uint32_t dts, uint32_t pts)
int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader;
char* flv = NULL;
int nb_flv = 0;
if ((ret = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1097,6 +1116,7 @@ int SrsIngestHlsOutput::write_h264_sps_pps(uint32_t dts, uint32_t pts)
int SrsIngestHlsOutput::write_h264_ipb_frame(string ibps, SrsVideoAvcFrameType frame_type, uint32_t dts, uint32_t pts)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// when sps or pps not sent, ignore the packet.
// @see https://github.com/ossrs/srs/issues/203
@ -1107,7 +1127,10 @@ int SrsIngestHlsOutput::write_h264_ipb_frame(string ibps, SrsVideoAvcFrameType f
int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU;
char* flv = NULL;
int nb_flv = 0;
if ((ret = avc->mux_avc2flv(ibps, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) {
if ((err = avc->mux_avc2flv(ibps, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1119,6 +1142,7 @@ int SrsIngestHlsOutput::write_h264_ipb_frame(string ibps, SrsVideoAvcFrameType f
int SrsIngestHlsOutput::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// ts tbn to flv tbn.
uint32_t dts = (uint32_t)(msg->dts / 90);
@ -1136,7 +1160,10 @@ int SrsIngestHlsOutput::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
char* frame = NULL;
int frame_size = 0;
SrsRawAacStreamCodec codec;
if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) {
if ((err = aac->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1150,7 +1177,10 @@ int SrsIngestHlsOutput::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
// generate sh.
if (aac_specific_config.empty()) {
std::string sh;
if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) {
if ((err = aac->mux_sequence_header(&codec, sh)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
aac_specific_config = sh;
@ -1179,10 +1209,14 @@ int SrsIngestHlsOutput::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
int SrsIngestHlsOutput::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
char* data = NULL;
int size = 0;
if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) {
if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
@ -1192,6 +1226,7 @@ int SrsIngestHlsOutput::write_audio_raw_frame(char* frame, int frame_size, SrsRa
int SrsIngestHlsOutput::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((ret = connect()) != ERROR_SUCCESS) {
return ret;
@ -1199,7 +1234,10 @@ int SrsIngestHlsOutput::rtmp_write_packet(char type, uint32_t timestamp, char* d
SrsSharedPtrMessage* msg = NULL;
if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != ERROR_SUCCESS) {
if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("mpegts: create shared ptr msg failed. ret=%d", ret);
return ret;
}
@ -1208,7 +1246,10 @@ int SrsIngestHlsOutput::rtmp_write_packet(char type, uint32_t timestamp, char* d
srs_info("RTMP type=%d, dts=%d, size=%d", type, timestamp, size);
// send out encoded msg.
if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
if ((err = sdk->send_and_free_message(msg)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
close();
srs_error("send RTMP type=%d, dts=%d, size=%d failed. ret=%d", type, timestamp, size, ret);
return ret;
@ -1220,6 +1261,7 @@ int SrsIngestHlsOutput::rtmp_write_packet(char type, uint32_t timestamp, char* d
int SrsIngestHlsOutput::connect()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// Ignore when connected.
if (sdk) {
@ -1234,14 +1276,20 @@ int SrsIngestHlsOutput::connect()
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsBasicRtmpClient(url, cto, sto);
if ((ret = sdk->connect()) != ERROR_SUCCESS) {
if ((err = sdk->connect()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
close();
srs_error("mpegts: connect %s failed, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", url.c_str(), cto, sto, ret);
return ret;
}
// publish.
if ((ret = sdk->publish()) != ERROR_SUCCESS) {
if ((err = sdk->publish()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
close();
srs_error("mpegts: publish %s failed. ret=%d", url.c_str(), ret);
return ret;
@ -1301,7 +1349,6 @@ public:
srs_error_t proxy_hls2rtmp(string hls, string rtmp)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// init st.
@ -1310,15 +1357,16 @@ srs_error_t proxy_hls2rtmp(string hls, string rtmp)
}
SrsHttpUri hls_uri, rtmp_uri;
if ((ret = hls_uri.initialize(hls)) != ERROR_SUCCESS) {
return srs_error_new(ret, "hls parse uri=%s", hls.c_str());
if ((err = hls_uri.initialize(hls)) != srs_success) {
return srs_error_wrap(err, "hls parse uri=%s", hls.c_str());
}
if ((ret = rtmp_uri.initialize(rtmp)) != ERROR_SUCCESS) {
return srs_error_new(ret, "rtmp parse uri=%s", rtmp.c_str());
if ((err = rtmp_uri.initialize(rtmp)) != srs_success) {
return srs_error_wrap(err, "rtmp parse uri=%s", rtmp.c_str());
}
SrsIngestHlsContext context(&hls_uri, &rtmp_uri);
for (;;) {
int ret = ERROR_SUCCESS;
if ((ret = context.proxy()) != ERROR_SUCCESS) {
return srs_error_new(ret, "proxy hls to rtmp");
}