diff --git a/README.md b/README.md index e2e0b14f9..2a0e67ad8 100755 --- a/README.md +++ b/README.md @@ -159,6 +159,7 @@ For previous versions, please read: ## V4 changes +* v4.0, 2020-06-13, GB28181 with JitterBuffer support. 4.0.30 * v4.0, 2020-06-03, Support enable C++11. 4.0.29 * v4.0, 2020-05-31, Remove [srs-librtmp](https://github.com/ossrs/srs/issues/1535#issuecomment-633907655). 4.0.28 * v4.0, 2020-05-21, For [#307][bug #307], disable GSO and sendmmsg. 4.0.27 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 846d355ed..fb3dc6f4a 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -319,7 +319,8 @@ stream_caster { rtp_port_min 58200; rtp_port_max 58300; # Whether wait for keyframe then forward to RTMP. - wait_keyframe off; + # default: on + wait_keyframe on; # Max timeout in seconds for RTP stream, if timeout, RTCP bye and close stream. # default: 30 rtp_idle_timeout 30; diff --git a/trunk/conf/push.gb28181.conf b/trunk/conf/push.gb28181.conf index 27b7996ef..78826147e 100644 --- a/trunk/conf/push.gb28181.conf +++ b/trunk/conf/push.gb28181.conf @@ -36,7 +36,7 @@ stream_caster { # 是否等待关键帧之后,再转发, # off:不需等待,直接转发 # on:等第一个关键帧后,再转发 - wait_keyframe off; + wait_keyframe on; # rtp包空闲等待时间,如果指定时间没有收到任何包 # rtp监听连接自动停止,发送BYE命令 @@ -54,6 +54,10 @@ stream_caster { # 因为flash,只支持11025 22050 44100 audio_enable off; + # 是否开启rtp缓冲 + # 开启之后能有效解决rtp乱序等问题 + jitterbuffer_enable on; + # 服务器主机号,可以域名或ip地址 # 也就是设备端将媒体发送的地址,如果是服务器是内外网 # 需要写外网地址, diff --git a/trunk/configure b/trunk/configure index 68f661783..a66a8cdf7 100755 --- a/trunk/configure +++ b/trunk/configure @@ -264,7 +264,7 @@ if [[ $SRS_RTC == YES ]]; then "srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source" "srs_app_rtc_api") fi if [[ $SRS_GB28181 == YES ]]; then - MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip") + MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip" "srs_app_gb28181_jitbuffer") fi DEFINES="" # add each modules for app diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 5f92d22fe..ebd55c380 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2189,6 +2189,8 @@ srs_error_t SrsConfig::global_to_json(SrsJsonObject* obj) sobj->set(sdir->name, sdir->dumps_arg0_to_integer()); } else if (sdir->name == "audio_enable") { sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); + } else if (sdir->name == "jitterbuffer_enable") { + sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); } else if (sdir->name == "host") { sobj->set(sdir->name, sdir->dumps_arg0_to_str()); } else if (sdir->name == "wait_keyframe") { @@ -3749,7 +3751,7 @@ srs_error_t SrsConfig::check_normal_config() if (n != "enabled" && n != "caster" && n != "output" && n != "listen" && n != "rtp_port_min" && n != "rtp_port_max" && n != "rtp_idle_timeout" && n != "sip" - && n != "audio_enable" && n != "wait_keyframe" + && n != "audio_enable" && n != "wait_keyframe" && n != "jitterbuffer_enable" && n != "host" && n != "auto_create_channel") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal stream_caster.%s", n.c_str()); } @@ -4541,9 +4543,25 @@ bool SrsConfig::get_stream_caster_gb28181_audio_enable(SrsConfDirective* conf) return SRS_CONF_PERFER_FALSE(conf->arg0()); } +bool SrsConfig::get_stream_caster_gb28181_jitterbuffer_enable(SrsConfDirective* conf) +{ + static bool DEFAULT = true; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("jitterbuffer_enable"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + bool SrsConfig::get_stream_caster_gb28181_wait_keyframe(SrsConfDirective* conf) { - static bool DEFAULT = false; + static bool DEFAULT = true; if (!conf) { return DEFAULT; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 35aa1d8fa..32f0c381d 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -508,6 +508,7 @@ public: virtual srs_utime_t get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf); virtual srs_utime_t get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf); virtual bool get_stream_caster_gb28181_audio_enable(SrsConfDirective* conf); + virtual bool get_stream_caster_gb28181_jitterbuffer_enable(SrsConfDirective* conf); virtual std::string get_stream_caster_gb28181_host(SrsConfDirective* conf); virtual std::string get_stream_caster_gb28181_serial(SrsConfDirective* conf); virtual std::string get_stream_caster_gb28181_realm(SrsConfDirective* conf); diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 99631237f..778183224 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include using namespace std; @@ -57,6 +59,7 @@ using namespace std; SrsPsRtpPacket::SrsPsRtpPacket() { + isFirstPacket = false; } SrsPsRtpPacket::~SrsPsRtpPacket() @@ -191,6 +194,15 @@ void SrsGb28181PsRtpProcessor::clear_pre_packet() } srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + if (config->jitterbuffer_enable){ + return on_rtp_packet_jitter(from, fromlen, buf, nb_buf); + }else{ + return on_rtp_packet(from, fromlen, buf, nb_buf); + } +} + +srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { srs_error_t err = srs_success; bool completed = false; @@ -217,7 +229,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const } //TODO: fixme: the same device uses the same SSRC to send with different local ports - std::stringstream ss; ss << pkt.ssrc << ":" << pkt.timestamp << ":" << port_string; std::string pkt_key = ss.str(); @@ -241,7 +252,7 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const //TODO: check sequence number out of order //it may be out of order, or multiple streaming ssrc are the same - if (pre_sequence_number + 1 != pkt.sequence_number && + if (((pre_sequence_number + 1) % 65536) != pkt.sequence_number && pre_sequence_number != pkt.sequence_number){ srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)", pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string); @@ -283,7 +294,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const if (!completed){ return err; } - //process completed frame data //clear processed one ps frame //on completed frame data rtp packet in muxer enqueue @@ -291,7 +301,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const if(key != cache_ps_rtp_packet.end()) { SrsGb28181RtmpMuxer* muxer = NULL; - //First, search according to the channel_id. Otherwise, search according to the SSRC. //Some channel_id are created by RTP pool, which are different ports. //No channel_id are created by multiplexing ports, which are the same port @@ -346,6 +355,123 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const return err; } +SrsGb28181RtmpMuxer* SrsGb28181PsRtpProcessor::create_rtmpmuxer(std::string channel_id, uint32_t ssrc) +{ + if(true){ + SrsGb28181RtmpMuxer* muxer = NULL; + //First, search according to the channel_id. Otherwise, search according to the SSRC. + //Some channel_id are created by RTP pool, which are different ports. + //No channel_id are created by multiplexing ports, which are the same port + if (!channel_id.empty()){ + muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); + }else { + muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(ssrc); + } + + //auto crate channel + if (!muxer && config->auto_create_channel){ + //auto create channel generated id + std::stringstream ss, ss1; + ss << "chid" << ssrc; + std::string tmp_id = ss.str(); + + SrsGb28181StreamChannel channel; + channel.set_channel_id(tmp_id); + channel.set_port_mode(RTP_PORT_MODE_FIXED); + channel.set_ssrc(ssrc); + + srs_error_t err2 = srs_success; + if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success){ + srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str()); + srs_error_reset(err2); + }; + + muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); + } + + return muxer; + }//end if FoundFrame +} + +srs_error_t SrsGb28181PsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, + int peer_port, std::string address_string, SrsPsRtpPacket *pkt) +{ + srs_error_t err = srs_success; + + if (!muxer) + return err; + + if (muxer){ + //TODO: fixme: the same device uses the same SSRC to send with different local ports + //record the first peer port + muxer->set_channel_peer_port(peer_port); + muxer->set_channel_peer_ip(address_string); + //not the first peer port's non processing + if (muxer->channel_peer_port() != peer_port){ + srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d", + muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port); + }else { + //muxer->ps_packet_enqueue(pkt); + muxer->insert_jitterbuffer(pkt); + }//end if (muxer->channel_peer_port() != peer_port) + }//end if (muxer) + + return err; +} + +srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + bool completed = false; + + pprint->elapse(); + + char address_string[64]; + char port_string[16]; + if (getnameinfo(from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST|NI_NUMERICSERV)){ + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); + } + + int peer_port = atoi(port_string); + + if (true) { + SrsBuffer stream(buf, nb_buf); + SrsPsRtpPacket *pkt = new SrsPsRtpPacket();; + + if ((err = pkt->decode(&stream)) != srs_success) { + srs_freep(pkt); + return srs_error_wrap(err, "ps rtp decode error"); + } + + std::stringstream ss3; + ss3 << pkt->ssrc << ":" << port_string; + std::string jitter_key = ss3.str(); + + pkt->completed = pkt->marker; + + + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt->version, + pkt->payload_type, pkt->sequence_number, pkt->timestamp, pkt->ssrc, + pkt->payload->length() + ); + } + + SrsGb28181RtmpMuxer *muxer = create_rtmpmuxer(channel_id, pkt->ssrc); + if (muxer){ + rtmpmuxer_enqueue_data(muxer, pkt->ssrc, peer_port, address_string, pkt); + } + + SrsAutoFree(SrsPsRtpPacket, pkt); + } + + return err; +} + //ISrsPsStreamHander ps stream raw video/audio hander interface ISrsPsStreamHander::ISrsPsStreamHander() { @@ -594,7 +720,6 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ //ts=1000 seq=4 mark=true payload= audio incomplete_len = ps_size - complete_len; complete_len = complete_len + incomplete_len; - } first_keyframe_flag = false; @@ -644,6 +769,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) wait_keyframe = _srs_config->get_stream_caster_gb28181_wait_keyframe(c); audio_enable = _srs_config->get_stream_caster_gb28181_audio_enable(c); auto_create_channel = _srs_config->get_stream_caster_gb28181_auto_create_channel(c); + jitterbuffer_enable = _srs_config->get_stream_caster_gb28181_jitterbuffer_enable(c); //sip config sip_enable = _srs_config->get_stream_caster_gb28181_sip_enable(c); @@ -692,14 +818,24 @@ SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bo h264_pps = ""; aac_specific_config = ""; + req = NULL; + server = NULL; + source = NULL; + source_publish = true; + + jitter_buffer = new SrsPsJitterBuffer(id); + ps_buffer = new char[1024*200]; } SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() { - close(); - destroy(); - srs_cond_destroy(wait_ps_queue); + close(); + + srs_cond_destroy(wait_ps_queue); + + srs_freep(jitter_buffer); + srs_freepa(ps_buffer); srs_freep(channel); srs_freep(ps_demixer); srs_freep(trd); @@ -707,6 +843,8 @@ SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() srs_freep(vjitter); srs_freep(ajitter); srs_freep(pprint); + + destroy(); } srs_error_t SrsGb28181RtmpMuxer::serve() @@ -791,12 +929,48 @@ void SrsGb28181RtmpMuxer::destroy() } } +srs_error_t SrsGb28181RtmpMuxer::initialize(SrsServer *s, SrsRequest* r) +{ + srs_error_t err = srs_success; + + if (!jitter_buffer) { + jitter_buffer = new SrsPsJitterBuffer(channel_id); + } + + jitter_buffer->SetDecodeErrorMode(kSelectiveErrors); + jitter_buffer->SetNackMode(kNack, -1, -1); + jitter_buffer->SetNackSettings(250, 450, 0); + + if (!source_publish) return err; + + req = r; + server = s; + + if ((err = _srs_sources->fetch_or_create(req, (ISrsSourceHandler*)server, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + //TODO: ??? + // if (!source->can_publish(false)) { + // return srs_error_new(ERROR_GB28181_SESSION_IS_EXIST, "stream %s busy", req->get_stream_url().c_str()); + // } + + if ((err = source->on_publish()) != srs_success) { + return srs_error_wrap(err, "on publish"); + } + + return err; +} + + srs_error_t SrsGb28181RtmpMuxer::do_cycle() { srs_error_t err = srs_success; recv_rtp_stream_time = srs_get_system_time(); send_rtmp_stream_time = srs_get_system_time(); - + uint32_t cur_timestamp = 0; + int buffer_size = 0; + //consume ps stream, and check status while (true) { @@ -806,19 +980,35 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() return srs_error_wrap(err, "gb28181 rtmp muxer cycle"); } - //demix ps to h264/aac, to rtmp - while(!ps_queue.empty()){ - SrsPsRtpPacket* pkt = ps_queue.front(); - if (pkt){ - if ((err = ps_demixer->on_ps_stream(pkt->payload->bytes(), - pkt->payload->length(), pkt->timestamp, pkt->ssrc)) != srs_success){ - srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); - srs_freep(err); - }; + SrsGb28181Config config = gb28181_manger->get_gb28181_config(); + + if (config.jitterbuffer_enable){ + + if(jitter_buffer->FoundFrame(cur_timestamp)){ + jitter_buffer->GetPsFrame(ps_buffer, buffer_size, cur_timestamp); + + if (buffer_size > 0){ + if ((err = ps_demixer->on_ps_stream(ps_buffer, buffer_size, cur_timestamp, 0)) != srs_success){ + srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); + srs_freep(err); + }; + } + } + }else { + //demix ps to h264/aac, to rtmp + while(!ps_queue.empty()){ + SrsPsRtpPacket* pkt = ps_queue.front(); + if (pkt){ + if ((err = ps_demixer->on_ps_stream(pkt->payload->bytes(), + pkt->payload->length(), pkt->timestamp, pkt->ssrc)) != srs_success){ + srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); + srs_freep(err); + }; + } + ps_queue.pop(); + //must be free pkt + srs_freep(pkt); } - ps_queue.pop(); - //must be free pkt - srs_freep(pkt); } if (pprint->can_print()) { @@ -842,7 +1032,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() channel->set_rtp_peer_ip(""); } - SrsGb28181Config config = gb28181_manger->get_gb28181_config(); + if (duration > config.rtp_idle_timeout){ srs_trace("gb28181: client id=%s, stream idle timeout, stop!!!", channel_id.c_str()); break; @@ -863,11 +1053,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() rtmp_close(); } - if (ps_queue.empty()){ - srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS); - }else { - srs_cond_timedwait(wait_ps_queue, 10 * SRS_UTIME_MILLISECONDS); - } + srs_cond_timedwait(wait_ps_queue, 10 * SRS_UTIME_MILLISECONDS); } return err; @@ -883,6 +1069,14 @@ void SrsGb28181RtmpMuxer::stop() close(); } + +void SrsGb28181RtmpMuxer::insert_jitterbuffer(SrsPsRtpPacket *pkt) +{ + recv_rtp_stream_time = srs_get_system_time(); + jitter_buffer->InsertPacket(*pkt, pkt->payload->bytes(), pkt->payload->length(), NULL); + srs_cond_signal(wait_ps_queue); +} + void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) { srs_assert(pkt); @@ -894,7 +1088,7 @@ void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) uint32_t size = ps_queue.size(); if (size > 100){ srs_warn("gb28181: rtmpmuxer too much queue data, need to clear!!!"); - while(ps_queue.empty()) { + while(!ps_queue.empty()) { SrsPsRtpPacket* pkt = ps_queue.front(); ps_queue.pop(); srs_freep(pkt); @@ -929,14 +1123,16 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f { srs_error_t err = srs_success; - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - //after the connection fails, need to clear flag - //and send the av header again next time - h264_sps = ""; - h264_pps = ""; - aac_specific_config = ""; - return srs_error_wrap(err, "connect"); + if (!source_publish){ + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + //after the connection fails, need to clear flag + //and send the av header again next time + h264_sps = ""; + h264_pps = ""; + aac_specific_config = ""; + return srs_error_wrap(err, "connect"); + } } if ((err = vjitter->correct(fpts)) != srs_success) { @@ -948,72 +1144,158 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f uint32_t pts = (uint32_t)(fpts / 90); srs_info("gb28181rtmpmuxer: on_rtp_video dts=%u", dts); + if (true) { + char *data = stream->bytes(); + int length = stream->length(); - - SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); - SrsAutoFree(SrsBuffer, avs); - // send each frame. - while (!avs->empty()) { - char* frame = NULL; - int frame_size = 0; - if ((err = avc->annexb_demux(avs, &frame, &frame_size)) != srs_success) { - return srs_error_wrap(err, "demux annexb"); - } - - // 5bits, 7.3.1 NAL unit syntax, - // ISO_IEC_14496-10-AVC-2003.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame - SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); - - // ignore the nalu type sei(6) aud(9) - if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter || - nal_unit_type == SrsAvcNaluTypeSEI) { - continue; - } - - // for sps - if (avc->is_sps(frame, frame_size)) { - std::string sps; - if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { - return srs_error_wrap(err, "demux sps"); - } - - if (h264_sps == sps) { - continue; - } - h264_sps = sps; - - if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - continue; - } - - // for pps - if (avc->is_pps(frame, frame_size)) { - std::string pps; - if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { - return srs_error_wrap(err, "demux pps"); - } - - if (h264_pps == pps) { - continue; - } - h264_pps = pps; - - if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - continue; - } - - // ibp frame. - srs_info("gb28181: demux avc ibp frame size=%d, dts=%d", frame_size, dts); - if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { - return srs_error_wrap(err, "write frame"); - } + err = replace_startcode_with_nalulen(data, length, dts, pts); } - + + return err; +} + +srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame2(char *frame, int frame_size, uint32_t pts, uint32_t dts) +{ + srs_error_t err = srs_success; + + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + // ignore the nalu type sei(6) aud(9) + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter || + nal_unit_type == SrsAvcNaluTypeSEI) { + return err; + } + + // for sps + if (avc->is_sps(frame, frame_size)) { + std::string sps; + if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { + return srs_error_wrap(err, "demux sps"); + } + + if (h264_sps == sps) { + return err; + } + h264_sps = sps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + return err; + } + + // for pps + if (avc->is_pps(frame, frame_size)) { + std::string pps; + if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { + return srs_error_wrap(err, "demux pps"); + } + + if (h264_pps == pps) { + return err; + } + h264_pps = pps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + return err; + } + + srs_info("gb28181: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { + return srs_error_wrap(err, "write frame"); + } + + return err; +} + + srs_error_t SrsGb28181RtmpMuxer::replace_startcode_with_nalulen(char *video_data, int &size, uint32_t pts, uint32_t dts) + { + srs_error_t err = srs_success; + + int index = 0; + std::list list_index; + + for(; index < size; index++){ + if (video_data[index] == 0x00 && video_data[index+1] == 0x00 && + video_data[index+2] == 0x00 && video_data[index+3] == 0x01){ + list_index.push_back(index); + } + + if (index > (size-4)) + break; + } + + if (list_index.size() == 1){ + int cur_pos = list_index.front(); + list_index.pop_front(); + + //0001xxxxxxxxxx + //xxxx0001xxxxxxx + uint32_t naluLen = size - cur_pos; + char *p = (char*)&naluLen; + + video_data[cur_pos] = p[3]; + video_data[cur_pos+1] = p[2]; + video_data[cur_pos+2] = p[1]; + video_data[cur_pos+3] = p[0]; + + char *frame = video_data + cur_pos + 4; + int frame_size = naluLen; + + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + + }else if (list_index.size() > 1){ + int pre_pos = list_index.front(); + list_index.pop_front(); + int first_pos = pre_pos; + + while(list_index.size() > 0){ + int cur_pos = list_index.front(); + list_index.pop_front(); + + //pre=========cur====================== + //0001xxxxxxxx0001xxxxxxxx0001xxxxxxxxx + //xxxxxxxxxxxx0001xxxxxxxx0001xxxxxxxxx + uint32_t naluLen = cur_pos - pre_pos - 4; + char *p = (char*)&naluLen; + + video_data[pre_pos] = p[3]; + video_data[pre_pos+1] = p[2]; + video_data[pre_pos+2] = p[1]; + video_data[pre_pos+3] = p[0]; + + char *frame = video_data + pre_pos + 4; + int frame_size = naluLen; + + pre_pos = cur_pos; + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + } + + //========================pre========== + //0001xxxxxxxx0001xxxxxxxx0001xxxxxxxxx + if (first_pos != pre_pos){ + + uint32_t naluLen = size - pre_pos - 4; + char *p = (char*)&naluLen; + + video_data[pre_pos] = p[3]; + video_data[pre_pos+1] = p[2]; + video_data[pre_pos+2] = p[1]; + video_data[pre_pos+3] = p[0]; + + char *frame = video_data + pre_pos + 4; + int frame_size = naluLen; + + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + } + }else{ + //xxxxxxxxxxxxxxxxxxx + char *frame = video_data; + int frame_size = size; + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + } + return err; } @@ -1021,16 +1303,18 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_audio(SrsSimpleStream* stream, int64_t f { srs_error_t err = srs_success; - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - //after the connection fails, need to clear flag - //and send the av header again next time - h264_sps = ""; - h264_pps = ""; - aac_specific_config = ""; - return srs_error_wrap(err, "connect"); + if (!source_publish){ + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + //after the connection fails, need to clear flag + //and send the av header again next time + h264_sps = ""; + h264_pps = ""; + aac_specific_config = ""; + return srs_error_wrap(err, "connect"); + } } - + if ((err = ajitter->correct(fdts)) != srs_success) { return srs_error_wrap(err, "jitter"); } @@ -1110,6 +1394,10 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_sps_pps(uint32_t dts, uint32_t pts) { srs_error_t err = srs_success; + if (h264_sps == "" || h264_pps == ""){ + return err; + } + // h264 raw to h264 packet. std::string sh; if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { @@ -1131,11 +1419,10 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_sps_pps(uint32_t dts, uint32_t pts) return srs_error_wrap(err, "write packet"); } - return err; } -srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts, bool writelen) { srs_error_t err = srs_success; @@ -1151,8 +1438,13 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame(char* frame, int frame_siz } std::string ibp; - if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { - return srs_error_wrap(err, "mux ibp frame"); + + if (writelen){ + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux ibp frame"); + } + }else{ + ibp = string(frame, frame_size); } int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; @@ -1183,6 +1475,10 @@ srs_error_t SrsGb28181RtmpMuxer::write_audio_raw_frame(char* frame, int frame_si srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { srs_error_t err = srs_success; + + if (source_publish){ + return rtmp_write_packet_by_source(type, timestamp, data, size); + } if ((err = connect()) != srs_success) { return srs_error_wrap(err, "connect"); @@ -1202,6 +1498,41 @@ srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet(char type, uint32_t timestamp close(); return srs_error_wrap(err, "write message"); } + return err; +} + +srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet_by_source(char type, uint32_t timestamp, char* data, int size) +{ + srs_error_t err = srs_success; + + send_rtmp_stream_time = srs_get_system_time(); + + //create a source that will process stream without the need for internal rtmpclient + if (type == SrsFrameTypeAudio) { + SrsMessageHeader header; + header.message_type = RTMP_MSG_AudioMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp & 0x3fffffff; + + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + + // TODO: FIXME: Check error. + shared_video->create(&header, data, size); + source->on_audio(shared_video); + }else if(type == SrsFrameTypeVideo) { + SrsMessageHeader header; + header.message_type = RTMP_MSG_VideoMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp & 0x3fffffff; + + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + + // TODO: FIXME: Check error. + shared_video->create(&header, data, size); + source->on_video(shared_video); + } return err; } @@ -1248,6 +1579,10 @@ void SrsGb28181RtmpMuxer::close() h264_sps = ""; h264_pps = ""; aac_specific_config = ""; + + if (source_publish && !source){ + source->on_unpublish(); + } } void SrsGb28181RtmpMuxer::rtmp_close(){ @@ -1320,9 +1655,10 @@ void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj) SrsGb28181Manger* _srs_gb28181 = NULL; //SrsGb28181Manger -SrsGb28181Manger::SrsGb28181Manger(SrsConfDirective* c) +SrsGb28181Manger::SrsGb28181Manger(SrsServer *s, SrsConfDirective* c) { // TODO: FIXME: support reload. + server = s; config = new SrsGb28181Config(c); manager = new SrsCoroutineManager(); } @@ -1330,11 +1666,10 @@ SrsGb28181Manger::SrsGb28181Manger(SrsConfDirective* c) SrsGb28181Manger::~SrsGb28181Manger() { used_ports.clear(); - + destroy(); + srs_freep(manager); srs_freep(config); - - destroy(); } srs_error_t SrsGb28181Manger::initialize() @@ -1403,7 +1738,7 @@ uint32_t SrsGb28181Manger::generate_ssrc(std::string id) return ssrc; } -srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsGb28181RtmpMuxer** gb28181) +srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsRequest *req, SrsGb28181RtmpMuxer** gb28181) { srs_error_t err = srs_success; @@ -1414,6 +1749,10 @@ srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsGb28 } muxer = new SrsGb28181RtmpMuxer(this, id, config->audio_enable, config->wait_keyframe); + if ((err = muxer->initialize(server, req)) != srs_success) { + return srs_error_wrap(err, "gb28181: rtmp muxer initialize %s", id.c_str()); + } + if ((err = muxer->serve()) != srs_success) { return srs_error_wrap(err, "gb28181: rtmp muxer serve %s", id.c_str()); } @@ -1569,13 +1908,6 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha return err; } - //create on rtmp muxer, gb28181 stream to rtmp - - if ((err = fetch_or_create_rtmpmuxer(id, &muxer)) != srs_success){ - srs_warn("gb28181: create rtmp muxer error, %s", srs_error_desc(err).c_str()); - return err; - } - //Start RTP listening port, receive gb28181 stream, //fixed is mux port, //random is random allocation port @@ -1611,7 +1943,6 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha //of the string value of the id ssrc = generate_ssrc(id); } - rtmpmuxer_map_by_ssrc(muxer, ssrc); //generate RTMP push stream address, //if the app and stream in the API are empty, @@ -1621,6 +1952,8 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha string app = channel->get_app(); string stream = channel->get_stream(); + SrsRequest request; + if (true) { string tcUrl, stream_name; @@ -1670,8 +2003,19 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha channel->set_ip(config->host); std::string play_url = srs_generate_rtmp_url(config->host, rtmp_port, "", "", app, stream_name, ""); channel->set_rtmp_url(play_url); + + request.app = app; + request.stream = stream_name; + //request.vhost = config->host; } + //create on rtmp muxer, gb28181 stream to rtmp + if ((err = fetch_or_create_rtmpmuxer(id, &request, &muxer)) != srs_success){ + srs_warn("gb28181: create rtmp muxer error, %s", srs_error_desc(err).c_str()); + return err; + } + + rtmpmuxer_map_by_ssrc(muxer, ssrc); muxer->set_rtmp_url(url); srs_trace("gb28181: create new stream channel id:%s rtmp url=%s", id.c_str(), url.c_str()); diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 9f551a26a..c179084d8 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -41,6 +41,9 @@ #include #include #include +#include +#include +#include #define RTP_PORT_MODE_FIXED "fixed" #define RTP_PORT_MODE_RANDOM "random" @@ -66,6 +69,10 @@ class SrsGb28181PsRtpProcessor; class SrsGb28181SipService; class SrsGb28181StreamChannel; class SrsGb28181SipSession; +class SrsPsJitterBuffer; +class SrsServer; +class SrsSource; +class SrsRequest; //ps rtp header packet parse class SrsPsRtpPacket: public SrsRtpPacket @@ -73,6 +80,7 @@ class SrsPsRtpPacket: public SrsRtpPacket public: SrsPsRtpPacket(); virtual ~SrsPsRtpPacket(); + bool isFirstPacket; public: virtual srs_error_t decode(SrsBuffer* stream); }; @@ -129,9 +137,15 @@ private: bool can_send_ps_av_packet(); void dispose(); void clear_pre_packet(); + SrsGb28181RtmpMuxer* create_rtmpmuxer(std::string channel_id, uint32_t ssrc); + srs_error_t rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, + int peer_port, std::string address_string, SrsPsRtpPacket *pkt); // Interface ISrsUdpHandler public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); +public: + virtual srs_error_t on_rtp_packet_jitter(const sockaddr* from, const int fromlen, char* buf, int nb_buf); + virtual srs_error_t on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); }; //ps stream processing parsing interface @@ -242,6 +256,15 @@ private: SrsRawAacStream* aac; std::string aac_specific_config; + SrsRequest* req; + SrsSource* source; + SrsServer* server; + + SrsPsJitterBuffer *jitter_buffer; + char *ps_buffer; + + bool source_publish; + public: std::queue ps_queue; @@ -252,6 +275,7 @@ public: public: virtual srs_error_t serve(); virtual void stop(); + srs_error_t initialize(SrsServer* s, SrsRequest* r); virtual std::string get_channel_id(); virtual void ps_packet_enqueue(SrsPsRtpPacket *pkt); @@ -265,6 +289,8 @@ public: virtual SrsGb28181StreamChannel get_channel(); srs_utime_t get_recv_stream_time(); + void insert_jitterbuffer(SrsPsRtpPacket *pkt); + private: virtual srs_error_t do_cycle(); virtual void destroy(); @@ -277,10 +303,14 @@ public: virtual srs_error_t on_rtp_video(SrsSimpleStream* stream, int64_t dts); virtual srs_error_t on_rtp_audio(SrsSimpleStream* stream, int64_t dts); private: + + srs_error_t replace_startcode_with_nalulen(char *video_data, int &size, uint32_t pts, uint32_t dts); + srs_error_t write_h264_ipb_frame2(char *frame, int frame_size, uint32_t pts, uint32_t dts); virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); - virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); + virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts, bool b = true); virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); + virtual srs_error_t rtmp_write_packet_by_source(char type, uint32_t timestamp, char* data, int size); private: // Connect to RTMP server. virtual srs_error_t connect(); @@ -304,6 +334,7 @@ public: int rtp_port_max; int rtp_mux_port; bool auto_create_channel; + bool jitterbuffer_enable; //sip config int sip_port; @@ -395,12 +426,13 @@ private: std::map rtmpmuxers; SrsCoroutineManager* manager; SrsGb28181SipService* sip_service; + SrsServer* server; public: - SrsGb28181Manger(SrsConfDirective* c); + SrsGb28181Manger(SrsServer* s, SrsConfDirective* c); virtual ~SrsGb28181Manger(); public: - srs_error_t fetch_or_create_rtmpmuxer(std::string id, SrsGb28181RtmpMuxer** gb28181); + srs_error_t fetch_or_create_rtmpmuxer(std::string id, SrsRequest *req, SrsGb28181RtmpMuxer** gb28181); SrsGb28181RtmpMuxer* fetch_rtmpmuxer(std::string id); SrsGb28181RtmpMuxer* fetch_rtmpmuxer_by_ssrc(uint32_t ssrc); void rtmpmuxer_map_by_ssrc(SrsGb28181RtmpMuxer*muxer, uint32_t ssrc); diff --git a/trunk/src/app/srs_app_gb28181_jitbuffer.cpp b/trunk/src/app/srs_app_gb28181_jitbuffer.cpp new file mode 100644 index 000000000..b4bc85776 --- /dev/null +++ b/trunk/src/app/srs_app_gb28181_jitbuffer.cpp @@ -0,0 +1,1705 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Lixin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include + + +using namespace std; + +// Use this rtt if no value has been reported. +static const int64_t kDefaultRtt = 200; + +// Request a keyframe if no continuous frame has been received for this +// number of milliseconds and NACKs are disabled. +static const int64_t kMaxDiscontinuousFramesTime = 1000; + +typedef std::pair FrameListPair; + +bool IsKeyFrame(FrameListPair pair) +{ + return pair.second->GetFrameType() == kVideoFrameKey; +} + +bool HasNonEmptyState(FrameListPair pair) +{ + return pair.second->GetState() != kStateEmpty; +} + +void FrameList::InsertFrame(SrsPsFrameBuffer* frame) +{ + insert(rbegin().base(), FrameListPair(frame->GetTimeStamp(), frame)); +} + +SrsPsFrameBuffer* FrameList::PopFrame(uint32_t timestamp) +{ + FrameList::iterator it = find(timestamp); + + if (it == end()) { + return NULL; + } + + SrsPsFrameBuffer* frame = it->second; + erase(it); + return frame; +} + +SrsPsFrameBuffer* FrameList::Front() const +{ + return begin()->second; +} + +SrsPsFrameBuffer* FrameList::FrontNext() const +{ + FrameList::const_iterator it = begin(); + it++; + + if (it != end()) + { + return it->second; + } + + return NULL; +} + + +SrsPsFrameBuffer* FrameList::Back() const +{ + return rbegin()->second; +} + +int FrameList::RecycleFramesUntilKeyFrame(FrameList::iterator* key_frame_it, + UnorderedFrameList* free_frames) +{ + int drop_count = 0; + FrameList::iterator it = begin(); + + while (!empty()) { + // Throw at least one frame. + it->second->Reset(); + free_frames->push_back(it->second); + erase(it++); + ++drop_count; + + if (it != end() && it->second->GetFrameType() == kVideoFrameKey) { + *key_frame_it = it; + return drop_count; + } + } + + *key_frame_it = end(); + return drop_count; +} + +void FrameList::CleanUpOldOrEmptyFrames(PsDecodingState* decoding_state, UnorderedFrameList* free_frames) +{ + while (!empty()) { + SrsPsFrameBuffer* oldest_frame = Front(); + bool remove_frame = false; + + if (oldest_frame->GetState() == kStateEmpty && size() > 1) { + // This frame is empty, try to update the last decoded state and drop it + // if successful. + remove_frame = decoding_state->UpdateEmptyFrame(oldest_frame); + } else { + remove_frame = decoding_state->IsOldFrame(oldest_frame); + } + + if (!remove_frame) { + break; + } + + free_frames->push_back(oldest_frame); + erase(begin()); + } +} + +void FrameList::Reset(UnorderedFrameList* free_frames) +{ + while (!empty()) { + begin()->second->Reset(); + free_frames->push_back(begin()->second); + erase(begin()); + } +} + + +VCMPacket::VCMPacket() + : + payloadType(0), + timestamp(0), + ntp_time_ms_(0), + seqNum(0), + dataPtr(NULL), + sizeBytes(0), + markerBit(false), + frameType(kEmptyFrame), + //codec(kVideoCodecUnknown), + isFirstPacket(false), + //completeNALU(kNaluUnset), + insertStartCode(false), + width(0), + height(0) + //codecSpecificHeader() +{ +} + + +VCMPacket::VCMPacket(const uint8_t* ptr, + size_t size, + uint16_t seq, + uint32_t ts, + bool mBit) : + payloadType(0), + timestamp(ts), + ntp_time_ms_(0), + seqNum(seq), + dataPtr(ptr), + sizeBytes(size), + markerBit(mBit), + + frameType(kVideoFrameDelta), + //codec(kVideoCodecUnknown), + isFirstPacket(false), + //completeNALU(kNaluComplete), + insertStartCode(false), + width(0), + height(0) + //codecSpecificHeader() +{} + +void VCMPacket::Reset() +{ + payloadType = 0; + timestamp = 0; + ntp_time_ms_ = 0; + seqNum = 0; + dataPtr = NULL; + sizeBytes = 0; + markerBit = false; + frameType = kEmptyFrame; + //codec = kVideoCodecUnknown; + isFirstPacket = false; + //completeNALU = kNaluUnset; + insertStartCode = false; + width = 0; + height = 0; + //memset(&codecSpecificHeader, 0, sizeof(RTPVideoHeader)); +} + + +SrsPsFrameBuffer::SrsPsFrameBuffer() +{ + empty_seq_num_low_ = 0; + empty_seq_num_high_ = 0; + first_packet_seq_num_ = 0; + last_packet_seq_num_ = 0; + complete_ = false; + decodable_ = false; + timeStamp_ = 0; + frame_type_ = kEmptyFrame; + decode_error_mode_ = kNoErrors; + _length = 0; + _size = 0; + _buffer = NULL; +} + +SrsPsFrameBuffer::~SrsPsFrameBuffer() +{ + srs_freepa(_buffer); +} + +void SrsPsFrameBuffer::Reset() +{ + //session_nack_ = false; + complete_ = false; + decodable_ = false; + frame_type_ = kVideoFrameDelta; + packets_.clear(); + empty_seq_num_low_ = -1; + empty_seq_num_high_ = -1; + first_packet_seq_num_ = -1; + last_packet_seq_num_ = -1; + _length = 0; +} + +size_t SrsPsFrameBuffer::Length() const +{ + return _length; +} + +PsFrameBufferEnum SrsPsFrameBuffer::InsertPacket(const VCMPacket& packet, const FrameData& frame_data) +{ + if (packets_.size() == kMaxPacketsInSession) { + srs_error("Max number of packets per frame has been reached."); + return kSizeError; + } + + if (packets_.size() == 0){ + timeStamp_ = packet.timestamp; + } + + uint32_t requiredSizeBytes = Length() + packet.sizeBytes; + + if (requiredSizeBytes >= _size) { + const uint8_t* prevBuffer = _buffer; + const uint32_t increments = requiredSizeBytes / + kBufferIncStepSizeBytes + + (requiredSizeBytes % + kBufferIncStepSizeBytes > 0); + const uint32_t newSize = _size + + increments * kBufferIncStepSizeBytes; + + if (newSize > kMaxJBFrameSizeBytes) { + srs_error("Failed to insert packet due to frame being too big."); + return kSizeError; + } + + VerifyAndAllocate(newSize); + UpdateDataPointers(prevBuffer, _buffer); + } + + // Find the position of this packet in the packet list in sequence number + // order and insert it. Loop over the list in reverse order. + ReversePacketIterator rit = packets_.rbegin(); + + for (; rit != packets_.rend(); ++rit) + if (LatestSequenceNumber(packet.seqNum, (*rit).seqNum) == packet.seqNum) { + break; + } + + // Check for duplicate packets. + if (rit != packets_.rend() && + (*rit).seqNum == packet.seqNum && (*rit).sizeBytes > 0) { + return kDuplicatePacket; + } + + if ((packet.isFirstPacket == 0)&& + (first_packet_seq_num_ == -1 || + IsNewerSequenceNumber(first_packet_seq_num_, packet.seqNum))) { + first_packet_seq_num_ = packet.seqNum; + }else if (first_packet_seq_num_ != -1 && + IsNewerSequenceNumber(first_packet_seq_num_, packet.seqNum)) { + srs_warn("Received packet with a sequence number which is out of frame boundaries"); + return kDuplicatePacket; + } + + if (packet.markerBit && + (last_packet_seq_num_ == -1 || + IsNewerSequenceNumber(packet.seqNum, last_packet_seq_num_))) { + last_packet_seq_num_ = packet.seqNum; + } else if (last_packet_seq_num_ != -1 && + IsNewerSequenceNumber(packet.seqNum, last_packet_seq_num_)) { + srs_warn("Received packet with a sequence number which is out of frame boundaries"); + return kDuplicatePacket; + } + + // The insert operation invalidates the iterator |rit|. + PacketIterator packet_list_it = packets_.insert(rit.base(), packet); + + //size_t returnLength = (*packet_list_it).sizeBytes; + size_t returnLength = InsertBuffer(_buffer, packet_list_it); + + // update length + _length = Length() + static_cast(returnLength); + UpdateCompleteSession(); + + if (decode_error_mode_ == kWithErrors) { + decodable_ = true; + } else if (decode_error_mode_ == kSelectiveErrors) { + UpdateDecodableSession(frame_data); + } + + if (complete()) { + state_ = kStateComplete; + return kCompleteSession; + } else if (decodable()) { + state_ = kStateDecodable; + return kDecodableSession; + } else if (!complete()) { + state_ = kStateIncomplete; + return kIncomplete; + } + + return kIncomplete; +} + +void SrsPsFrameBuffer::VerifyAndAllocate(const uint32_t minimumSize) +{ + if (minimumSize > _size) { + // create buffer of sufficient size + uint8_t* newBuffer = new uint8_t[minimumSize]; + + if (_buffer) { + // copy old data + memcpy(newBuffer, _buffer, _size); + delete [] _buffer; + } + + srs_info("SrsPsFrameBuffer::VerifyAndAllocate oldbuffer=%d newbuffer=%d, minimumSize=%d, size=%d", + _buffer, newBuffer, minimumSize, _size); + + _buffer = newBuffer; + _size = minimumSize; + } +} + +void SrsPsFrameBuffer::UpdateDataPointers(const uint8_t* old_base_ptr, + const uint8_t* new_base_ptr) +{ + for (PacketIterator it = packets_.begin(); it != packets_.end(); ++it) + if ((*it).dataPtr != NULL) { + //assert(old_base_ptr != NULL && new_base_ptr != NULL); + (*it).dataPtr = new_base_ptr + ((*it).dataPtr - old_base_ptr); + } +} + + +size_t SrsPsFrameBuffer::InsertBuffer(uint8_t* frame_buffer, + PacketIterator packet_it) +{ + VCMPacket& packet = *packet_it; + PacketIterator it; + + // Calculate the offset into the frame buffer for this packet. + size_t offset = 0; + + for (it = packets_.begin(); it != packet_it; ++it) { + offset += (*it).sizeBytes; + } + + // Set the data pointer to pointing to the start of this packet in the + // frame buffer. + const uint8_t* packet_buffer = packet.dataPtr; + packet.dataPtr = frame_buffer + offset; + + ShiftSubsequentPackets( + packet_it, + packet.sizeBytes); + + packet.sizeBytes = Insert(packet_buffer, + packet.sizeBytes, + const_cast(packet.dataPtr)); + return packet.sizeBytes; +} + +size_t SrsPsFrameBuffer::Insert(const uint8_t* buffer, + size_t length, + uint8_t* frame_buffer) +{ + memcpy(frame_buffer, buffer, length); + return length; +} + +void SrsPsFrameBuffer::ShiftSubsequentPackets(PacketIterator it, + int steps_to_shift) +{ + ++it; + + if (it == packets_.end()) { + return; + } + + uint8_t* first_packet_ptr = const_cast((*it).dataPtr); + int shift_length = 0; + + // Calculate the total move length and move the data pointers in advance. + for (; it != packets_.end(); ++it) { + shift_length += (*it).sizeBytes; + + if ((*it).dataPtr != NULL) { + (*it).dataPtr += steps_to_shift; + } + } + + memmove(first_packet_ptr + steps_to_shift, first_packet_ptr, shift_length); +} + +void SrsPsFrameBuffer::UpdateCompleteSession() +{ + if (HaveFirstPacket() && HaveLastPacket()) { + // Do we have all the packets in this session? + bool complete_session = true; + PacketIterator it = packets_.begin(); + PacketIterator prev_it = it; + ++it; + + for (; it != packets_.end(); ++it) { + if (!InSequence(it, prev_it)) { + complete_session = false; + break; + } + + prev_it = it; + } + + complete_ = complete_session; + } +} + +bool SrsPsFrameBuffer::HaveFirstPacket() const +{ + return !packets_.empty() && (first_packet_seq_num_ != -1); +} + +bool SrsPsFrameBuffer::HaveLastPacket() const +{ + return !packets_.empty() && (last_packet_seq_num_ != -1); +} + +bool SrsPsFrameBuffer::InSequence(const PacketIterator& packet_it, + const PacketIterator& prev_packet_it) +{ + // If the two iterators are pointing to the same packet they are considered + // to be in sequence. + return (packet_it == prev_packet_it || + (static_cast((*prev_packet_it).seqNum + 1) == + (*packet_it).seqNum)); +} + +void SrsPsFrameBuffer::UpdateDecodableSession(const FrameData& frame_data) +{ + // Irrelevant if session is already complete or decodable + if (complete_ || decodable_) { + return; + } + + // TODO(agalusza): Account for bursty loss. + // TODO(agalusza): Refine these values to better approximate optimal ones. + // Do not decode frames if the RTT is lower than this. + const int64_t kRttThreshold = 100; + // Do not decode frames if the number of packets is between these two + // thresholds. + const float kLowPacketPercentageThreshold = 0.2f; + const float kHighPacketPercentageThreshold = 0.8f; + + if (frame_data.rtt_ms < kRttThreshold + || !HaveFirstPacket() + || (NumPackets() <= kHighPacketPercentageThreshold + * frame_data.rolling_average_packets_per_frame + && NumPackets() > kLowPacketPercentageThreshold + * frame_data.rolling_average_packets_per_frame)) { + return; + } + + decodable_ = true; +} + +bool SrsPsFrameBuffer::complete() const +{ + return complete_; +} + +bool SrsPsFrameBuffer::decodable() const +{ + return decodable_; +} + +int SrsPsFrameBuffer::NumPackets() const +{ + return packets_.size(); +} + +uint32_t SrsPsFrameBuffer::GetTimeStamp() const +{ + return timeStamp_; +} + +FrameType SrsPsFrameBuffer::GetFrameType() const +{ + return frame_type_; +} + +PsFrameBufferStateEnum SrsPsFrameBuffer::GetState() const +{ + return state_; +} + +int32_t SrsPsFrameBuffer::GetHighSeqNum() const +{ + if (packets_.empty()) { + return empty_seq_num_high_; + } + + if (empty_seq_num_high_ == -1) { + return packets_.back().seqNum; + } + + return LatestSequenceNumber(packets_.back().seqNum, empty_seq_num_high_); + +} + +int32_t SrsPsFrameBuffer::GetLowSeqNum() const +{ + if (packets_.empty()) { + return empty_seq_num_low_; + } + + return packets_.front().seqNum; +} + +const uint8_t* SrsPsFrameBuffer::Buffer() const +{ + return _buffer; +} + + +void SrsPsFrameBuffer::InformOfEmptyPacket(uint16_t seq_num) +{ + // Empty packets may be FEC or filler packets. They are sequential and + // follow the data packets, therefore, we should only keep track of the high + // and low sequence numbers and may assume that the packets in between are + // empty packets belonging to the same frame (timestamp). + if (empty_seq_num_high_ == -1) { + empty_seq_num_high_ = seq_num; + } else { + empty_seq_num_high_ = LatestSequenceNumber(seq_num, empty_seq_num_high_); + } + + if (empty_seq_num_low_ == -1 || IsNewerSequenceNumber(empty_seq_num_low_, + seq_num)) { + empty_seq_num_low_ = seq_num; + } +} + + +size_t SrsPsFrameBuffer::DeletePacketData(PacketIterator start, PacketIterator end) +{ + size_t bytes_to_delete = 0; // The number of bytes to delete. + PacketIterator packet_after_end = end; + //++packet_after_end; + + // Get the number of bytes to delete. + // Clear the size of these packets. + for (PacketIterator it = start; it != packet_after_end; ++it) { + bytes_to_delete += (*it).sizeBytes; + (*it).sizeBytes = 0; + (*it).dataPtr = NULL; + } + + if (bytes_to_delete > 0) { + ShiftSubsequentPackets(end, -static_cast(bytes_to_delete)); + } + + return bytes_to_delete; +} + +size_t SrsPsFrameBuffer::MakeDecodable() +{ + size_t return_length = 0; + + if (packets_.empty()) { + return 0; + } + + PacketIterator begin = packets_.begin(); + PacketIterator end = packets_.end(); + return_length += DeletePacketData(begin, end); + + return return_length; +} + +void SrsPsFrameBuffer::PrepareForDecode(bool continuous) +{ + + size_t bytes_removed = MakeDecodable(); + _length -= bytes_removed; + + // Transfer frame information to EncodedFrame and create any codec + // specific information. + //_frameType = ConvertFrameType(_sessionInfo.FrameType()); + //_completeFrame = _sessionInfo.complete(); + //_missingFrame = !continuous; +} + + + + bool SrsPsFrameBuffer::DeletePacket(int &count) + { + return true; + } + + +///////////////////////////////////////////////////////////////////////////// + +PsDecodingState::PsDecodingState() + : sequence_num_(0), + time_stamp_(0), + //picture_id_(kNoPictureId), + //temporal_id_(kNoTemporalIdx), + //tl0_pic_id_(kNoTl0PicIdx), + full_sync_(true), + in_initial_state_(true), + m_firstPacket(false) {} + +PsDecodingState::~PsDecodingState() {} + +void PsDecodingState::Reset() +{ + // TODO(mikhal): Verify - not always would want to reset the sync + sequence_num_ = 0; + time_stamp_ = 0; + //picture_id_ = kNoPictureId; + //temporal_id_ = kNoTemporalIdx; + //tl0_pic_id_ = kNoTl0PicIdx; + full_sync_ = true; + in_initial_state_ = true; +} + +uint32_t PsDecodingState::time_stamp() const +{ + return time_stamp_; +} + +uint16_t PsDecodingState::sequence_num() const +{ + return sequence_num_; +} + +bool PsDecodingState::IsOldFrame(const SrsPsFrameBuffer* frame) const +{ + //assert(frame != NULL); + if (frame == NULL) { + return false; + } + + if (in_initial_state_) { + return false; + } + + return !IsNewerTimestamp(frame->GetTimeStamp(), time_stamp_); +} + +bool PsDecodingState::IsOldPacket(const VCMPacket* packet) +{ + //assert(packet != NULL); + if (packet == NULL) { + return false; + } + + if (in_initial_state_) { + return false; + } + + if (!m_firstPacket) { + m_firstPacket = true; + time_stamp_ = packet->timestamp - 1; + return false; + } + + return !IsNewerTimestamp(packet->timestamp, time_stamp_); +} + +void PsDecodingState::SetState(const SrsPsFrameBuffer* frame) +{ + //assert(frame != NULL && frame->GetHighSeqNum() >= 0); + UpdateSyncState(frame); + sequence_num_ = static_cast(frame->GetHighSeqNum()); + time_stamp_ = frame->GetTimeStamp(); + in_initial_state_ = false; +} + +void PsDecodingState::CopyFrom(const PsDecodingState& state) +{ + sequence_num_ = state.sequence_num_; + time_stamp_ = state.time_stamp_; + full_sync_ = state.full_sync_; + in_initial_state_ = state.in_initial_state_; +} + +bool PsDecodingState::UpdateEmptyFrame(const SrsPsFrameBuffer* frame) +{ + bool empty_packet = frame->GetHighSeqNum() == frame->GetLowSeqNum(); + + if (in_initial_state_ && empty_packet) { + // Drop empty packets as long as we are in the initial state. + return true; + } + + if ((empty_packet && ContinuousSeqNum(frame->GetHighSeqNum())) || + ContinuousFrame(frame)) { + // Continuous empty packets or continuous frames can be dropped if we + // advance the sequence number. + sequence_num_ = frame->GetHighSeqNum(); + time_stamp_ = frame->GetTimeStamp(); + return true; + } + + return false; +} + +void PsDecodingState::UpdateOldPacket(const VCMPacket* packet) +{ + //assert(packet != NULL); + if (packet == NULL) { + return; + } + + if (packet->timestamp == time_stamp_) { + // Late packet belonging to the last decoded frame - make sure we update the + // last decoded sequence number. + sequence_num_ = LatestSequenceNumber(packet->seqNum, sequence_num_); + } +} + +void PsDecodingState::SetSeqNum(uint16_t new_seq_num) +{ + sequence_num_ = new_seq_num; +} + +bool PsDecodingState::in_initial_state() const +{ + return in_initial_state_; +} + +bool PsDecodingState::full_sync() const +{ + return full_sync_; +} + +void PsDecodingState::UpdateSyncState(const SrsPsFrameBuffer* frame) +{ + if (in_initial_state_) { + return; + } +} + +bool PsDecodingState::ContinuousFrame(const SrsPsFrameBuffer* frame) const +{ + // Check continuity based on the following hierarchy: + // - Temporal layers (stop here if out of sync). + // - Picture Id when available. + // - Sequence numbers. + // Return true when in initial state. + // Note that when a method is not applicable it will return false. + //assert(frame != NULL); + if (frame == NULL) { + return false; + } + + // A key frame is always considered continuous as it doesn't refer to any + // frames and therefore won't introduce any errors even if prior frames are + // missing. + if (frame->GetFrameType() == kVideoFrameKey) { + return true; + } + + // When in the initial state we always require a key frame to start decoding. + if (in_initial_state_) { + return false; + } + + return ContinuousSeqNum(static_cast(frame->GetLowSeqNum())); +} + +bool PsDecodingState::ContinuousSeqNum(uint16_t seq_num) const +{ + return seq_num == static_cast(sequence_num_ + 1); +} + +SrsPsJitterBuffer::SrsPsJitterBuffer(std::string key): + running_(false), + max_number_of_frames_(kStartNumberOfFrames), + free_frames_(), + decodable_frames_(), + incomplete_frames_(), + last_decoded_state_(), + first_packet_since_reset_(true), + incoming_frame_rate_(0), + incoming_frame_count_(0), + time_last_incoming_frame_count_(0), + incoming_bit_count_(0), + incoming_bit_rate_(0), + num_consecutive_old_packets_(0), + num_packets_(0), + num_packets_free_(0), + num_duplicated_packets_(0), + num_discarded_packets_(0), + time_first_packet_ms_(0), + //jitter_estimate_(clock), + //inter_frame_delay_(clock_->TimeInMilliseconds()), + rtt_ms_(kDefaultRtt), + nack_mode_(kNoNack), + low_rtt_nack_threshold_ms_(-1), + high_rtt_nack_threshold_ms_(-1), + missing_sequence_numbers_(SequenceNumberLessThan()), + nack_seq_nums_(), + max_nack_list_size_(0), + max_packet_age_to_nack_(0), + max_incomplete_time_ms_(0), + decode_error_mode_(kNoErrors), + average_packets_per_frame_(0.0f), + frame_counter_(0), + key_(key) +{ + for (int i = 0; i < kStartNumberOfFrames; i++) { + free_frames_.push_back(new SrsPsFrameBuffer()); + } + + wait_cond_t = srs_cond_new(); +} + +SrsPsJitterBuffer::~SrsPsJitterBuffer() +{ + for (UnorderedFrameList::iterator it = free_frames_.begin(); + it != free_frames_.end(); ++it) { + delete *it; + } + + for (FrameList::iterator it = incomplete_frames_.begin(); + it != incomplete_frames_.end(); ++it) { + delete it->second; + } + + for (FrameList::iterator it = decodable_frames_.begin(); + it != decodable_frames_.end(); ++it) { + delete it->second; + } + + srs_cond_destroy(wait_cond_t); +} + +void SrsPsJitterBuffer::SetDecodeErrorMode(PsDecodeErrorMode error_mode) +{ + decode_error_mode_ = error_mode; +} + +void SrsPsJitterBuffer::Flush() +{ + //CriticalSectionScoped cs(crit_sect_); + decodable_frames_.Reset(&free_frames_); + incomplete_frames_.Reset(&free_frames_); + last_decoded_state_.Reset(); // TODO(mikhal): sync reset. + //frame_event_->Reset(); + num_consecutive_old_packets_ = 0; + // Also reset the jitter and delay estimates + //jitter_estimate_.Reset(); + //inter_frame_delay_.Reset(clock_->TimeInMilliseconds()); + //waiting_for_completion_.frame_size = 0; + //waiting_for_completion_.timestamp = 0; + //waiting_for_completion_.latest_packet_time = -1; + first_packet_since_reset_ = true; + missing_sequence_numbers_.clear(); +} + + + +PsFrameBufferEnum SrsPsJitterBuffer::InsertPacket(const SrsPsRtpPacket &pkt, char *buf, int size, + bool* retransmitted) +{ + + const VCMPacket packet((const uint8_t*)buf, size, + pkt.sequence_number, pkt.timestamp, pkt.marker); + + ++num_packets_; + + if (num_packets_ == 1) { + time_first_packet_ms_ = srs_update_system_time(); + } + + //Does this packet belong to an old frame? + // if (last_decoded_state_.IsOldPacket(&packet)) { + + // //return kOldPacket; + // } + + //num_consecutive_old_packets_ = 0; + + SrsPsFrameBuffer* frame; + FrameList* frame_list; + + const PsFrameBufferEnum error = GetFrame(packet, &frame, &frame_list); + + if (error != kNoError) { + return error; + } + + + srs_utime_t now_ms = srs_update_system_time(); + + FrameData frame_data; + frame_data.rtt_ms = 0; //rtt_ms_; + frame_data.rolling_average_packets_per_frame = 25;//average_packets_per_frame_; + + PsFrameBufferEnum buffer_state = frame->InsertPacket(packet, frame_data); + + if (buffer_state > 0) { + incoming_bit_count_ += packet.sizeBytes << 3; + + if (first_packet_since_reset_) { + latest_received_sequence_number_ = packet.seqNum; + first_packet_since_reset_ = false; + } else { + // if (IsPacketRetransmitted(packet)) { + // frame->IncrementNackCount(); + // } + + UpdateNackList(packet.seqNum); + + latest_received_sequence_number_ = LatestSequenceNumber( + latest_received_sequence_number_, packet.seqNum); + } + } + + // Is the frame already in the decodable list? + bool continuous = IsContinuous(*frame); + + switch (buffer_state) { + case kGeneralError: + case kTimeStampError: + case kSizeError: { + free_frames_.push_back(frame); + break; + } + + case kCompleteSession: { + //CountFrame(*frame); + // if (previous_state != kStateDecodable && + // previous_state != kStateComplete) { + // /*CountFrame(*frame);*/ //????????????????????�?? by ylr + // if (continuous) { + // // Signal that we have a complete session. + // frame_event_->Set(); + // } + // } + } + + // Note: There is no break here - continuing to kDecodableSession. + case kDecodableSession: { + // *retransmitted = (frame->GetNackCount() > 0); + + if (true || continuous) { + decodable_frames_.InsertFrame(frame); + FindAndInsertContinuousFrames(*frame); + } else { + incomplete_frames_.InsertFrame(frame); + + // If NACKs are enabled, keyframes are triggered by |GetNackList|. + // if (nack_mode_ == kNoNack && NonContinuousOrIncompleteDuration() > + // 90 * kMaxDiscontinuousFramesTime) { + // return kFlushIndicator; + // } + } + + break; + } + + case kIncomplete: { + if (frame->GetState() == kStateEmpty && + last_decoded_state_.UpdateEmptyFrame(frame)) { + free_frames_.push_back(frame); + return kNoError; + } else { + incomplete_frames_.InsertFrame(frame); + + // If NACKs are enabled, keyframes are triggered by |GetNackList|. + // if (nack_mode_ == kNoNack && NonContinuousOrIncompleteDuration() > + // 90 * kMaxDiscontinuousFramesTime) { + // return kFlushIndicator; + // } + } + + break; + } + + case kNoError: + case kOutOfBoundsPacket: + case kDuplicatePacket: { + // Put back the frame where it came from. + if (frame_list != NULL) { + frame_list->InsertFrame(frame); + } else { + free_frames_.push_back(frame); + } + + ++num_duplicated_packets_; + break; + } + + case kFlushIndicator:{ + free_frames_.push_back(frame); + } + return kFlushIndicator; + + default: + assert(false); + } + + return buffer_state; +} + +// Gets frame to use for this timestamp. If no match, get empty frame. +PsFrameBufferEnum SrsPsJitterBuffer::GetFrame(const VCMPacket& packet, + SrsPsFrameBuffer** frame, + FrameList** frame_list) +{ + *frame = incomplete_frames_.PopFrame(packet.timestamp); + + if (*frame != NULL) { + *frame_list = &incomplete_frames_; + return kNoError; + } + + *frame = decodable_frames_.PopFrame(packet.timestamp); + + if (*frame != NULL) { + *frame_list = &decodable_frames_; + return kNoError; + } + + *frame_list = NULL; + // No match, return empty frame. + *frame = GetEmptyFrame(); + + if (*frame == NULL) { + // No free frame! Try to reclaim some... + bool found_key_frame = RecycleFramesUntilKeyFrame(); + *frame = GetEmptyFrame(); + assert(*frame); + + if (!found_key_frame) { + free_frames_.push_back(*frame); + return kFlushIndicator; + } + } + + (*frame)->Reset(); + return kNoError; +} + +SrsPsFrameBuffer* SrsPsJitterBuffer::GetEmptyFrame() +{ + if (free_frames_.empty()) { + if (!TryToIncreaseJitterBufferSize()) { + return NULL; + } + } + + SrsPsFrameBuffer* frame = free_frames_.front(); + free_frames_.pop_front(); + return frame; +} + +bool SrsPsJitterBuffer::TryToIncreaseJitterBufferSize() +{ + if (max_number_of_frames_ >= kMaxNumberOfFrames) { + return false; + } + + free_frames_.push_back(new SrsPsFrameBuffer()); + ++max_number_of_frames_; + return true; +} + +// Recycle oldest frames up to a key frame, used if jitter buffer is completely +// full. +bool SrsPsJitterBuffer::RecycleFramesUntilKeyFrame() +{ + // First release incomplete frames, and only release decodable frames if there + // are no incomplete ones. + FrameList::iterator key_frame_it; + bool key_frame_found = false; + int dropped_frames = 0; + dropped_frames += incomplete_frames_.RecycleFramesUntilKeyFrame( + &key_frame_it, &free_frames_); + key_frame_found = key_frame_it != incomplete_frames_.end(); + + if (dropped_frames == 0) { + dropped_frames += decodable_frames_.RecycleFramesUntilKeyFrame( + &key_frame_it, &free_frames_); + key_frame_found = key_frame_it != decodable_frames_.end(); + } + + if (key_frame_found) { + //LOG(LS_INFO) << "Found key frame while dropping frames."; + // Reset last decoded state to make sure the next frame decoded is a key + // frame, and start NACKing from here. + last_decoded_state_.Reset(); + DropPacketsFromNackList(EstimatedLowSequenceNumber(*key_frame_it->second)); + } else if (decodable_frames_.empty()) { + // All frames dropped. Reset the decoding state and clear missing sequence + // numbers as we're starting fresh. + last_decoded_state_.Reset(); + missing_sequence_numbers_.clear(); + } + + return key_frame_found; +} + +bool SrsPsJitterBuffer::IsContinuousInState(const SrsPsFrameBuffer& frame, + const PsDecodingState& decoding_state) const +{ + if (decode_error_mode_ == kWithErrors) { + return true; + } + + // Is this frame (complete or decodable) and continuous? + // kStateDecodable will never be set when decode_error_mode_ is false + // as SessionInfo determines this state based on the error mode (and frame + // completeness). + return (frame.GetState() == kStateComplete || + frame.GetState() == kStateDecodable) && + decoding_state.ContinuousFrame(&frame); +} + +bool SrsPsJitterBuffer::IsContinuous(const SrsPsFrameBuffer& frame) const +{ + if (IsContinuousInState(frame, last_decoded_state_)) { + return true; + } + + PsDecodingState decoding_state; + decoding_state.CopyFrom(last_decoded_state_); + + for (FrameList::const_iterator it = decodable_frames_.begin(); + it != decodable_frames_.end(); ++it) { + SrsPsFrameBuffer* decodable_frame = it->second; + + if (IsNewerTimestamp(decodable_frame->GetTimeStamp(), frame.GetTimeStamp())) { + break; + } + + decoding_state.SetState(decodable_frame); + + if (IsContinuousInState(frame, decoding_state)) { + return true; + } + } + + return false; +} + +void SrsPsJitterBuffer::FindAndInsertContinuousFrames(const SrsPsFrameBuffer& new_frame) +{ + PsDecodingState decoding_state; + decoding_state.CopyFrom(last_decoded_state_); + decoding_state.SetState(&new_frame); + + // When temporal layers are available, we search for a complete or decodable + // frame until we hit one of the following: + // 1. Continuous base or sync layer. + // 2. The end of the list was reached. + for (FrameList::iterator it = incomplete_frames_.begin(); + it != incomplete_frames_.end();) { + SrsPsFrameBuffer* frame = it->second; + + if (IsNewerTimestamp(new_frame.GetTimeStamp(), frame->GetTimeStamp())) { + ++it; + continue; + } + + if (IsContinuousInState(*frame, decoding_state)) { + decodable_frames_.InsertFrame(frame); + incomplete_frames_.erase(it++); + decoding_state.SetState(frame); + } else { + ++it; + } + } +} + +// Must be called under the critical section |crit_sect_|. +void SrsPsJitterBuffer::CleanUpOldOrEmptyFrames() +{ + decodable_frames_.CleanUpOldOrEmptyFrames(&last_decoded_state_, + &free_frames_); + incomplete_frames_.CleanUpOldOrEmptyFrames(&last_decoded_state_, + &free_frames_); + + if (!last_decoded_state_.in_initial_state()) { + //DropPacketsFromNackList(last_decoded_state_.sequence_num()); + } +} + +// Returns immediately or a |max_wait_time_ms| ms event hang waiting for a +// complete frame, |max_wait_time_ms| decided by caller. +bool SrsPsJitterBuffer::NextCompleteTimestamp(uint32_t max_wait_time_ms, uint32_t* timestamp) +{ + // crit_sect_->Enter(); + + // if (!running_) { + // crit_sect_->Leave(); + // return false; + // } + + CleanUpOldOrEmptyFrames(); + + if (decodable_frames_.empty() || + decodable_frames_.Front()->GetState() != kStateComplete) { + const int64_t end_wait_time_ms = srs_update_system_time() + + max_wait_time_ms * SRS_UTIME_MILLISECONDS; + int64_t wait_time_ms = max_wait_time_ms * SRS_UTIME_MILLISECONDS; + + while (wait_time_ms > 0) { + int ret = srs_cond_timedwait(wait_cond_t, wait_time_ms); + if (ret == 0) { + // Finding oldest frame ready for decoder. + CleanUpOldOrEmptyFrames(); + + if (decodable_frames_.empty() || + decodable_frames_.Front()->GetState() != kStateComplete) { + wait_time_ms = end_wait_time_ms - srs_update_system_time(); + } else { + break; + } + } else { + break; + } + } + + // Inside |crit_sect_|. + } else { + // We already have a frame, reset the event. + //frame_event_->Reset(); + } + + if (decodable_frames_.empty() || + decodable_frames_.Front()->GetState() != kStateComplete) { + //crit_sect_->Leave(); + return false; + } + + *timestamp = decodable_frames_.Front()->GetTimeStamp(); + //crit_sect_->Leave(); + return true; +} + +bool SrsPsJitterBuffer::NextMaybeIncompleteTimestamp(uint32_t* timestamp) +{ + if (decode_error_mode_ == kNoErrors) { + srs_warn("gb28181 SrsJitterBuffer::NextMaybeIncompleteTimestamp decode_error_mode_ %d", decode_error_mode_); + // No point to continue, as we are not decoding with errors. + return false; + } + + CleanUpOldOrEmptyFrames(); + + SrsPsFrameBuffer* oldest_frame; + + if (decodable_frames_.empty()) { + if (incomplete_frames_.size() <= 1) { + return false; + } + + oldest_frame = incomplete_frames_.Front(); + PsFrameBufferStateEnum oldest_frame_state = oldest_frame->GetState(); + + SrsPsFrameBuffer* next_frame; + next_frame = incomplete_frames_.FrontNext(); + + if (oldest_frame_state != kStateComplete && next_frame && + IsNewerSequenceNumber(next_frame->GetLowSeqNum(), oldest_frame->GetHighSeqNum()) && + next_frame->NumPackets() > 0 ) { + oldest_frame_state = kStateComplete; + } + + // Frame will only be removed from buffer if it is complete (or decodable). + if (oldest_frame_state < kStateComplete) { + int oldest_frame_hight_seq = oldest_frame->GetHighSeqNum(); + int next_frame_low_seq = next_frame->GetLowSeqNum(); + + srs_warn("gb28181 SrsPsJitterBuffer::NextMaybeIncompleteTimestamp key(%s) incomplete oldest_frame (%u,%d)->(%u,%d)", + key_.c_str(), oldest_frame->GetTimeStamp(), oldest_frame_hight_seq, + next_frame->GetTimeStamp(), next_frame_low_seq); + return false; + } + } else { + oldest_frame = decodable_frames_.Front(); + + // If we have exactly one frame in the buffer, release it only if it is + // complete. We know decodable_frames_ is not empty due to the previous + // check. + if (decodable_frames_.size() == 1 && incomplete_frames_.empty() + && oldest_frame->GetState() != kStateComplete) { + return false; + } + } + + *timestamp = oldest_frame->GetTimeStamp(); + return true; +} + +SrsPsFrameBuffer* SrsPsJitterBuffer::ExtractAndSetDecode(uint32_t timestamp) +{ + // Extract the frame with the desired timestamp. + SrsPsFrameBuffer* frame = decodable_frames_.PopFrame(timestamp); + bool continuous = true; + + if (!frame) { + frame = incomplete_frames_.PopFrame(timestamp); + + if (frame) { + continuous = last_decoded_state_.ContinuousFrame(frame); + } else { + return NULL; + } + } + + // The state must be changed to decoding before cleaning up zero sized + // frames to avoid empty frames being cleaned up and then given to the + // decoder. Propagates the missing_frame bit. + //frame->PrepareForDecode(continuous); + + // We have a frame - update the last decoded state and nack list. + last_decoded_state_.SetState(frame); + //DropPacketsFromNackList(last_decoded_state_.sequence_num()); + + // if ((*frame).IsSessionComplete()) { + // //UpdateAveragePacketsPerFrame(frame->NumPackets()); + // } + + return frame; +} + +// Release frame when done with decoding. Should never be used to release +// frames from within the jitter buffer. +void SrsPsJitterBuffer::ReleaseFrame(SrsPsFrameBuffer* frame) +{ + //CriticalSectionScoped cs(crit_sect_); + //VCMFrameBuffer* frame_buffer = static_cast(frame); + + if (frame) { + free_frames_.push_back(frame); + } +} + +bool SrsPsJitterBuffer::FoundFrame(uint32_t& time_stamp) +{ + + bool found_frame = NextCompleteTimestamp(0, &time_stamp); + + if (!found_frame) { + found_frame = NextMaybeIncompleteTimestamp(&time_stamp); + } + + return found_frame; +} + +bool SrsPsJitterBuffer::GetPsFrame(char *buffer, int &size, const uint32_t time_stamp) +{ + SrsPsFrameBuffer* frame = ExtractAndSetDecode(time_stamp); + + if (frame == NULL) { + return false; + } + + if (buffer == NULL){ + return false; + } + + size = frame->Length(); + const uint8_t *frame_buffer = frame->Buffer(); + memcpy(buffer, frame_buffer, size); + + frame->PrepareForDecode(false); + ReleaseFrame(frame); + return true; +} + + +SrsPsFrameBuffer* SrsPsJitterBuffer::NextFrame() const +{ + if (!decodable_frames_.empty()) { + return decodable_frames_.Front(); + } + + if (!incomplete_frames_.empty()) { + return incomplete_frames_.Front(); + } + + return NULL; +} + +bool SrsPsJitterBuffer::UpdateNackList(uint16_t sequence_number) +{ + if (nack_mode_ == kNoNack) { + return true; + } + + // Make sure we don't add packets which are already too old to be decoded. + if (!last_decoded_state_.in_initial_state()) { + latest_received_sequence_number_ = LatestSequenceNumber( + latest_received_sequence_number_, + last_decoded_state_.sequence_num()); + } + + if (IsNewerSequenceNumber(sequence_number, + latest_received_sequence_number_)) { + // Push any missing sequence numbers to the NACK list. + for (uint16_t i = latest_received_sequence_number_ + 1; + IsNewerSequenceNumber(sequence_number, i); ++i) { + missing_sequence_numbers_.insert(missing_sequence_numbers_.end(), i); + } + + /* + if (TooLargeNackList() && !HandleTooLargeNackList()) { + srs_warn("gb28181: SrsPsJitterBuffer key(%s) requesting key frame due to too large NACK list.", key_.c_str()); + return false; + } + + if (MissingTooOldPacket(sequence_number) && + !HandleTooOldPackets(sequence_number)) { + srs_warn("gb28181: SrsPsJitterBuffer key(%s) requesting key frame due to missing too old packets", key_.c_str()); + return false; + } + */ + } else { + missing_sequence_numbers_.erase(sequence_number); + } + + return true; +} + +bool SrsPsJitterBuffer::TooLargeNackList() const +{ + return missing_sequence_numbers_.size() > max_nack_list_size_; +} + +bool SrsPsJitterBuffer::HandleTooLargeNackList() +{ + // Recycle frames until the NACK list is small enough. It is likely cheaper to + // request a key frame than to retransmit this many missing packets. + srs_warn("gb28181: SrsPsJitterBuffer NACK list has grown too large: %d > %d", + missing_sequence_numbers_.size(), max_nack_list_size_); + bool key_frame_found = false; + + while (TooLargeNackList()) { + key_frame_found = RecycleFramesUntilKeyFrame(); + } + + return key_frame_found; +} + +bool SrsPsJitterBuffer::MissingTooOldPacket(uint16_t latest_sequence_number) const +{ + if (missing_sequence_numbers_.empty()) { + return false; + } + + const uint16_t age_of_oldest_missing_packet = latest_sequence_number - + *missing_sequence_numbers_.begin(); + // Recycle frames if the NACK list contains too old sequence numbers as + // the packets may have already been dropped by the sender. + return age_of_oldest_missing_packet > max_packet_age_to_nack_; +} + +bool SrsPsJitterBuffer::HandleTooOldPackets(uint16_t latest_sequence_number) +{ + bool key_frame_found = false; + const uint16_t age_of_oldest_missing_packet = latest_sequence_number - + *missing_sequence_numbers_.begin(); + srs_warn("gb28181: SrsPsJitterBuffer NACK list contains too old sequence numbers: %d > %d", + age_of_oldest_missing_packet, + max_packet_age_to_nack_); + + while (MissingTooOldPacket(latest_sequence_number)) { + key_frame_found = RecycleFramesUntilKeyFrame(); + } + + return key_frame_found; +} + +void SrsPsJitterBuffer::DropPacketsFromNackList(uint16_t last_decoded_sequence_number) +{ + // Erase all sequence numbers from the NACK list which we won't need any + // longer. + missing_sequence_numbers_.erase(missing_sequence_numbers_.begin(), + missing_sequence_numbers_.upper_bound( + last_decoded_sequence_number)); +} + +void SrsPsJitterBuffer::SetNackMode(PsNackMode mode, + int64_t low_rtt_nack_threshold_ms, + int64_t high_rtt_nack_threshold_ms) +{ + nack_mode_ = mode; + + if (mode == kNoNack) { + missing_sequence_numbers_.clear(); + } + + assert(low_rtt_nack_threshold_ms >= -1 && high_rtt_nack_threshold_ms >= -1); + assert(high_rtt_nack_threshold_ms == -1 || + low_rtt_nack_threshold_ms <= high_rtt_nack_threshold_ms); + assert(low_rtt_nack_threshold_ms > -1 || high_rtt_nack_threshold_ms == -1); + low_rtt_nack_threshold_ms_ = low_rtt_nack_threshold_ms; + high_rtt_nack_threshold_ms_ = high_rtt_nack_threshold_ms; + + // Don't set a high start rtt if high_rtt_nack_threshold_ms_ is used, to not + // disable NACK in hybrid mode. + if (rtt_ms_ == kDefaultRtt && high_rtt_nack_threshold_ms_ != -1) { + rtt_ms_ = 0; + } + + // if (!WaitForRetransmissions()) { + // jitter_estimate_.ResetNackCount(); + // } +} + +void SrsPsJitterBuffer::SetNackSettings(size_t max_nack_list_size, + int max_packet_age_to_nack, + int max_incomplete_time_ms) +{ + assert(max_packet_age_to_nack >= 0); + assert(max_incomplete_time_ms_ >= 0); + max_nack_list_size_ = max_nack_list_size; + max_packet_age_to_nack_ = max_packet_age_to_nack; + max_incomplete_time_ms_ = max_incomplete_time_ms; + nack_seq_nums_.resize(max_nack_list_size_); +} + +PsNackMode SrsPsJitterBuffer::nack_mode() const +{ + return nack_mode_; +} + + +int SrsPsJitterBuffer::NonContinuousOrIncompleteDuration() +{ + if (incomplete_frames_.empty()) { + return 0; + } + + uint32_t start_timestamp = incomplete_frames_.Front()->GetTimeStamp(); + + if (!decodable_frames_.empty()) { + start_timestamp = decodable_frames_.Back()->GetTimeStamp(); + } + + return incomplete_frames_.Back()->GetTimeStamp() - start_timestamp; +} + +uint16_t SrsPsJitterBuffer::EstimatedLowSequenceNumber(const SrsPsFrameBuffer& frame) const +{ + assert(frame.GetLowSeqNum() >= 0); + + if (frame.HaveFirstPacket()) { + return frame.GetLowSeqNum(); + } + + // This estimate is not accurate if more than one packet with lower sequence + // number is lost. + return frame.GetLowSeqNum() - 1; +} + +uint16_t* SrsPsJitterBuffer::GetNackList(uint16_t* nack_list_size, + bool* request_key_frame) +{ + //CriticalSectionScoped cs(crit_sect_); + *request_key_frame = false; + + if (nack_mode_ == kNoNack) { + *nack_list_size = 0; + return NULL; + } + + if (last_decoded_state_.in_initial_state()) { + SrsPsFrameBuffer* next_frame = NextFrame(); + const bool first_frame_is_key = next_frame && + //next_frame->FrameType() == kVideoFrameKey && + next_frame->HaveFirstPacket(); + + if (!first_frame_is_key) { + bool have_non_empty_frame = decodable_frames_.end() != find_if( + decodable_frames_.begin(), decodable_frames_.end(), + HasNonEmptyState); + + if (!have_non_empty_frame) { + have_non_empty_frame = incomplete_frames_.end() != find_if( + incomplete_frames_.begin(), incomplete_frames_.end(), + HasNonEmptyState); + } + + bool found_key_frame = RecycleFramesUntilKeyFrame(); + + if (!found_key_frame) { + *request_key_frame = have_non_empty_frame; + *nack_list_size = 0; + return NULL; + } + } + } + + if (TooLargeNackList()) { + *request_key_frame = !HandleTooLargeNackList(); + } + + if (max_incomplete_time_ms_ > 0) { + int non_continuous_incomplete_duration = + NonContinuousOrIncompleteDuration(); + + if (non_continuous_incomplete_duration > 90 * max_incomplete_time_ms_) { + // LOG_F(LS_WARNING) << "Too long non-decodable duration: " + // << non_continuous_incomplete_duration << " > " + // << 90 * max_incomplete_time_ms_; + FrameList::reverse_iterator rit = find_if(incomplete_frames_.rbegin(), + incomplete_frames_.rend(), IsKeyFrame); + + if (rit == incomplete_frames_.rend()) { + // Request a key frame if we don't have one already. + *request_key_frame = true; + *nack_list_size = 0; + return NULL; + } else { + // Skip to the last key frame. If it's incomplete we will start + // NACKing it. + // Note that the estimated low sequence number is correct for VP8 + // streams because only the first packet of a key frame is marked. + last_decoded_state_.Reset(); + DropPacketsFromNackList(EstimatedLowSequenceNumber(*rit->second)); + } + } + } + + unsigned int i = 0; + SequenceNumberSet::iterator it = missing_sequence_numbers_.begin(); + + for (; it != missing_sequence_numbers_.end(); ++it, ++i) { + nack_seq_nums_[i] = *it; + } + + *nack_list_size = i; + return &nack_seq_nums_[0]; +} + +bool SrsPsJitterBuffer::WaitForRetransmissions() +{ + if (nack_mode_ == kNoNack) { + // NACK disabled -> don't wait for retransmissions. + return false; + } + + // Evaluate if the RTT is higher than |high_rtt_nack_threshold_ms_|, and in + // that case we don't wait for retransmissions. + if (high_rtt_nack_threshold_ms_ >= 0 && + rtt_ms_ >= high_rtt_nack_threshold_ms_) { + return false; + } + + return true; +} diff --git a/trunk/src/app/srs_app_gb28181_jitbuffer.hpp b/trunk/src/app/srs_app_gb28181_jitbuffer.hpp new file mode 100644 index 000000000..7a7fbe2e2 --- /dev/null +++ b/trunk/src/app/srs_app_gb28181_jitbuffer.hpp @@ -0,0 +1,461 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Lixin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_GB28181_JITBUFFER_HPP +#define SRS_APP_GB28181_JITBUFFER_HPP + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +class SrsPsRtpPacket; +class SrsPsFrameBuffer; +class PsDecodingState; +class SrsGb28181RtmpMuxer; +class VCMPacket; + +///jittbuffer + +enum FrameType { + kEmptyFrame = 0, + kAudioFrameSpeech = 1, + kAudioFrameCN = 2, + kVideoFrameKey = 3, // independent frame + kVideoFrameDelta = 4, // depends on the previus frame + kVideoFrameGolden = 5, // depends on a old known previus frame + kVideoFrameAltRef = 6 +}; + +// Used to indicate which decode with errors mode should be used. +enum PsDecodeErrorMode { + kNoErrors, // Never decode with errors. Video will freeze + // if nack is disabled. + kSelectiveErrors, // Frames that are determined decodable in + // VCMSessionInfo may be decoded with missing + // packets. As not all incomplete frames will be + // decodable, video will freeze if nack is disabled. + kWithErrors // Release frames as needed. Errors may be + // introduced as some encoded frames may not be + // complete. +}; + +// Used to estimate rolling average of packets per frame. +static const float kFastConvergeMultiplier = 0.4f; +static const float kNormalConvergeMultiplier = 0.2f; + +enum { kMaxNumberOfFrames = 300 }; +enum { kStartNumberOfFrames = 6 }; +enum { kMaxVideoDelayMs = 10000 }; +enum { kPacketsPerFrameMultiplier = 5 }; +enum { kFastConvergeThreshold = 5}; + +enum PsJitterBufferEnum { + kMaxConsecutiveOldFrames = 60, + kMaxConsecutiveOldPackets = 300, + kMaxPacketsInSession = 800, + kBufferIncStepSizeBytes = 30000, // >20 packets. + kMaxJBFrameSizeBytes = 4000000 // sanity don't go above 4Mbyte. +}; + +enum PsFrameBufferEnum { + kOutOfBoundsPacket = -7, + kNotInitialized = -6, + kOldPacket = -5, + kGeneralError = -4, + kFlushIndicator = -3, // Indicator that a flush has occurred. + kTimeStampError = -2, + kSizeError = -1, + kNoError = 0, + kIncomplete = 1, // Frame incomplete. + kCompleteSession = 3, // at least one layer in the frame complete. + kDecodableSession = 4, // Frame incomplete, but ready to be decoded + kDuplicatePacket = 5 // We're receiving a duplicate packet. +}; + +enum PsFrameBufferStateEnum { + kStateEmpty, // frame popped by the RTP receiver + kStateIncomplete, // frame that have one or more packet(s) stored + kStateComplete, // frame that have all packets + kStateDecodable // Hybrid mode - frame can be decoded +}; + +enum PsNackMode { + kNack, + kNoNack +}; + +// Used to pass data from jitter buffer to session info. +// This data is then used in determining whether a frame is decodable. +struct FrameData { + int64_t rtt_ms; + float rolling_average_packets_per_frame; +}; + +inline bool IsNewerSequenceNumber(uint16_t sequence_number, + uint16_t prev_sequence_number) +{ + return sequence_number != prev_sequence_number && + static_cast(sequence_number - prev_sequence_number) < 0x8000; +} + +inline bool IsNewerTimestamp(uint32_t timestamp, uint32_t prev_timestamp) +{ + return timestamp != prev_timestamp && + static_cast(timestamp - prev_timestamp) < 0x80000000; +} + +inline uint16_t LatestSequenceNumber(uint16_t sequence_number1, + uint16_t sequence_number2) +{ + return IsNewerSequenceNumber(sequence_number1, sequence_number2) + ? sequence_number1 + : sequence_number2; +} + +inline uint32_t LatestTimestamp(uint32_t timestamp1, uint32_t timestamp2) +{ + return IsNewerTimestamp(timestamp1, timestamp2) ? timestamp1 : timestamp2; +} + +typedef std::list UnorderedFrameList; + +class TimestampLessThan { +public: + bool operator() (const uint32_t& timestamp1, + const uint32_t& timestamp2) const + { + return IsNewerTimestamp(timestamp2, timestamp1); + } +}; + +class FrameList + : public std::map { +public: + void InsertFrame(SrsPsFrameBuffer* frame); + SrsPsFrameBuffer* PopFrame(uint32_t timestamp); + SrsPsFrameBuffer* Front() const; + SrsPsFrameBuffer* FrontNext() const; + SrsPsFrameBuffer* Back() const; + int RecycleFramesUntilKeyFrame(FrameList::iterator* key_frame_it, + UnorderedFrameList* free_frames); + void CleanUpOldOrEmptyFrames(PsDecodingState* decoding_state, UnorderedFrameList* free_frames); + void Reset(UnorderedFrameList* free_frames); +}; + + +class VCMPacket { +public: + VCMPacket(); + VCMPacket(const uint8_t* ptr, + size_t size, + uint16_t seqNum, + uint32_t timestamp, + bool markerBit); + + void Reset(); + + uint8_t payloadType; + uint32_t timestamp; + // NTP time of the capture time in local timebase in milliseconds. + int64_t ntp_time_ms_; + uint16_t seqNum; + const uint8_t* dataPtr; + size_t sizeBytes; + bool markerBit; + + FrameType frameType; + //cloopenwebrtc::VideoCodecType codec; + + bool isFirstPacket; // Is this first packet in a frame. + //VCMNaluCompleteness completeNALU; // Default is kNaluIncomplete. + bool insertStartCode; // True if a start code should be inserted before this + // packet. + int width; + int height; + //RTPVideoHeader codecSpecificHeader; +}; + +class SrsPsFrameBuffer { +public: + SrsPsFrameBuffer(); + virtual ~SrsPsFrameBuffer(); + +public: + PsFrameBufferEnum InsertPacket(const VCMPacket& packet, const FrameData& frame_data); + void UpdateCompleteSession(); + void UpdateDecodableSession(const FrameData& frame_data); + bool HaveFirstPacket() const; + bool HaveLastPacket() const; + void Reset(); + + uint32_t GetTimeStamp() const; + FrameType GetFrameType() const; + PsFrameBufferStateEnum GetState() const; + + int32_t GetHighSeqNum() const; + int32_t GetLowSeqNum() const; + size_t Length() const; + const uint8_t* Buffer() const; + + int NumPackets() const; + void InformOfEmptyPacket(uint16_t seq_num); + + bool complete() const; + bool decodable() const; + + bool GetPsPlayload(SrsSimpleStream **ps_data, int &count); + bool DeletePacket(int &count); + void PrepareForDecode(bool continuous); + +private: + + typedef std::list PacketList; + typedef PacketList::iterator PacketIterator; + typedef PacketList::const_iterator PacketIteratorConst; + typedef PacketList::reverse_iterator ReversePacketIterator; + + bool InSequence(const PacketIterator& packet_it, + const PacketIterator& prev_packet_it); + + size_t InsertBuffer(uint8_t* frame_buffer, PacketIterator packet_it); + size_t Insert(const uint8_t* buffer, size_t length, uint8_t* frame_buffer); + void ShiftSubsequentPackets(PacketIterator it, int steps_to_shift); + void VerifyAndAllocate(const uint32_t minimumSize); + void UpdateDataPointers(const uint8_t* old_base_ptr, const uint8_t* new_base_ptr); + size_t DeletePacketData(PacketIterator start, PacketIterator end); + size_t MakeDecodable(); + + + PacketList packets_; + int empty_seq_num_low_; + int empty_seq_num_high_; + + int first_packet_seq_num_; + int last_packet_seq_num_; + + bool complete_; + bool decodable_; + + uint32_t timeStamp_; + FrameType frame_type_; + + PsDecodeErrorMode decode_error_mode_; + PsFrameBufferStateEnum state_; + + uint16_t nackCount_; + int64_t latestPacketTimeMs_; + + // The payload. + uint8_t* _buffer; + size_t _size; + size_t _length; +}; + +class PsDecodingState { +public: + PsDecodingState(); + ~PsDecodingState(); + // Check for old frame + bool IsOldFrame(const SrsPsFrameBuffer* frame) const; + // Check for old packet + bool IsOldPacket(const VCMPacket* packet); + // Check for frame continuity based on current decoded state. Use best method + // possible, i.e. temporal info, picture ID or sequence number. + bool ContinuousFrame(const SrsPsFrameBuffer* frame) const; + void SetState(const SrsPsFrameBuffer* frame); + void CopyFrom(const PsDecodingState& state); + bool UpdateEmptyFrame(const SrsPsFrameBuffer* frame); + // Update the sequence number if the timestamp matches current state and the + // sequence number is higher than the current one. This accounts for packets + // arriving late. + void UpdateOldPacket(const VCMPacket* packet); + void SetSeqNum(uint16_t new_seq_num); + void Reset(); + uint32_t time_stamp() const; + uint16_t sequence_num() const; + // Return true if at initial state. + bool in_initial_state() const; + // Return true when sync is on - decode all layers. + bool full_sync() const; + +private: + void UpdateSyncState(const SrsPsFrameBuffer* frame); + // Designated continuity functions + //bool ContinuousPictureId(int picture_id) const; + bool ContinuousSeqNum(uint16_t seq_num) const; + //bool ContinuousLayer(int temporal_id, int tl0_pic_id) const; + //bool UsingPictureId(const SrsPsFrameBuffer* frame) const; + + // Keep state of last decoded frame. + // TODO(mikhal/stefan): create designated classes to handle these types. + uint16_t sequence_num_; + uint32_t time_stamp_; + int picture_id_; + int temporal_id_; + int tl0_pic_id_; + bool full_sync_; // Sync flag when temporal layers are used. + bool in_initial_state_; + + bool m_firstPacket; +}; + +class SrsPsJitterBuffer +{ +public: + SrsPsJitterBuffer(std::string key); + virtual ~SrsPsJitterBuffer(); + +public: + srs_error_t start(); + void Reset(); + PsFrameBufferEnum InsertPacket(const SrsPsRtpPacket &packet, char *buf, int size, bool* retransmitted); + void ReleaseFrame(SrsPsFrameBuffer* frame); + bool FoundFrame(uint32_t& time_stamp); + bool GetPsFrame(char *buffer, int &size, const uint32_t time_stamp); + void SetDecodeErrorMode(PsDecodeErrorMode error_mode); + void SetNackMode(PsNackMode mode,int64_t low_rtt_nack_threshold_ms, + int64_t high_rtt_nack_threshold_ms); + void SetNackSettings(size_t max_nack_list_size,int max_packet_age_to_nack, + int max_incomplete_time_ms); + uint16_t* GetNackList(uint16_t* nack_list_size, bool* request_key_frame); + void Flush(); + +private: + + PsFrameBufferEnum GetFrame(const VCMPacket& packet, SrsPsFrameBuffer** frame, + FrameList** frame_list); + SrsPsFrameBuffer* GetEmptyFrame(); + bool NextCompleteTimestamp(uint32_t max_wait_time_ms, uint32_t* timestamp); + bool NextMaybeIncompleteTimestamp(uint32_t* timestamp); + SrsPsFrameBuffer* ExtractAndSetDecode(uint32_t timestamp); + SrsPsFrameBuffer* NextFrame() const; + + + bool TryToIncreaseJitterBufferSize(); + bool RecycleFramesUntilKeyFrame(); + bool IsContinuous(const SrsPsFrameBuffer& frame) const; + bool IsContinuousInState(const SrsPsFrameBuffer& frame, + const PsDecodingState& decoding_state) const; + void FindAndInsertContinuousFrames(const SrsPsFrameBuffer& new_frame); + void CleanUpOldOrEmptyFrames(); + + //nack + bool UpdateNackList(uint16_t sequence_number); + bool TooLargeNackList() const; + bool HandleTooLargeNackList(); + bool MissingTooOldPacket(uint16_t latest_sequence_number) const; + bool HandleTooOldPackets(uint16_t latest_sequence_number); + void DropPacketsFromNackList(uint16_t last_decoded_sequence_number); + PsNackMode nack_mode() const; + int NonContinuousOrIncompleteDuration(); + uint16_t EstimatedLowSequenceNumber(const SrsPsFrameBuffer& frame) const; + bool WaitForRetransmissions(); + +private: + class SequenceNumberLessThan { + public: + bool operator() (const uint16_t& sequence_number1, + const uint16_t& sequence_number2) const + { + return IsNewerSequenceNumber(sequence_number2, sequence_number1); + } + }; + typedef std::set SequenceNumberSet; + + std::string key_; + + srs_cond_t wait_cond_t; + // If we are running (have started) or not. + bool running_; + // Number of allocated frames. + int max_number_of_frames_; + UnorderedFrameList free_frames_; + FrameList decodable_frames_; + FrameList incomplete_frames_; + PsDecodingState last_decoded_state_; + bool first_packet_since_reset_; + + // Statistics. + //VCMReceiveStatisticsCallback* stats_callback_ GUARDED_BY(crit_sect_); + // Frame counts for each type (key, delta, ...) + //FrameCounts receive_statistics_; + // Latest calculated frame rates of incoming stream. + unsigned int incoming_frame_rate_; + unsigned int incoming_frame_count_; + int64_t time_last_incoming_frame_count_; + unsigned int incoming_bit_count_; + unsigned int incoming_bit_rate_; + // Number of frames in a row that have been too old. + int num_consecutive_old_frames_; + // Number of packets in a row that have been too old. + int num_consecutive_old_packets_; + // Number of packets received. + int num_packets_; + int num_packets_free_; + // Number of duplicated packets received. + int num_duplicated_packets_; + // Number of packets discarded by the jitter buffer. + int num_discarded_packets_; + // Time when first packet is received. + int64_t time_first_packet_ms_; + + // Jitter estimation. + // Filter for estimating jitter. + //VCMJitterEstimator jitter_estimate_; + // Calculates network delays used for jitter calculations. + //VCMInterFrameDelay inter_frame_delay_; + //VCMJitterSample waiting_for_completion_; + int64_t rtt_ms_; + + // NACK and retransmissions. + PsNackMode nack_mode_; + int64_t low_rtt_nack_threshold_ms_; + int64_t high_rtt_nack_threshold_ms_; + // Holds the internal NACK list (the missing sequence numbers). + SequenceNumberSet missing_sequence_numbers_; + uint16_t latest_received_sequence_number_; + std::vector nack_seq_nums_; + size_t max_nack_list_size_; + int max_packet_age_to_nack_; // Measured in sequence numbers. + int max_incomplete_time_ms_; + + PsDecodeErrorMode decode_error_mode_; + // Estimated rolling average of packets per frame + float average_packets_per_frame_; + // average_packets_per_frame converges fast if we have fewer than this many + // frames. + int frame_counter_; +}; + +#endif + diff --git a/trunk/src/app/srs_app_gb28181_sip.cpp b/trunk/src/app/srs_app_gb28181_sip.cpp index 169cd1fca..b31264bcf 100644 --- a/trunk/src/app/srs_app_gb28181_sip.cpp +++ b/trunk/src/app/srs_app_gb28181_sip.cpp @@ -101,6 +101,8 @@ SrsGb28181SipSession::SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipReques _fromlen = 0; _sip_cseq = 100; + + lock_list = srs_mutex_new(); } SrsGb28181SipSession::~SrsGb28181SipSession() @@ -110,6 +112,7 @@ SrsGb28181SipSession::~SrsGb28181SipSession() srs_freep(req); srs_freep(trd); srs_freep(pprint); + srs_mutex_destroy(lock_list); } srs_error_t SrsGb28181SipSession::serve() @@ -126,6 +129,7 @@ srs_error_t SrsGb28181SipSession::serve() void SrsGb28181SipSession::destroy() { //destory all device + SrsLocker(lock_list); std::map::iterator it; for (it = _device_list.begin(); it != _device_list.end(); ++it) { srs_freep(it->second); @@ -162,6 +166,7 @@ srs_error_t SrsGb28181SipSession::do_cycle() if (_register_status == SrsGb28181SipSessionRegisterOk && _alive_status == SrsGb28181SipSessionAliveOk) { + SrsLocker(lock_list); std::map::iterator it; for (it = _device_list.begin(); it != _device_list.end(); it++) { SrsGb28181Device *device = it->second; @@ -260,6 +265,7 @@ srs_error_t SrsGb28181SipSession::do_cycle() (reg_duration / SRS_UTIME_SECONDS), (alive_duration / SRS_UTIME_SECONDS)); + SrsLocker(lock_list); std::map::iterator it; for (it = _device_list.begin(); it != _device_list.end(); it++) { SrsGb28181Device *device = it->second; @@ -309,6 +315,7 @@ srs_error_t SrsGb28181SipSession::cycle() void SrsGb28181SipSession::update_device_list(std::map lst) { + SrsLocker(lock_list); std::map::iterator it; for (it = lst.begin(); it != lst.end(); ++it) { std::string id = it->first; @@ -335,6 +342,7 @@ void SrsGb28181SipSession::update_device_list(std::map SrsGb28181Device* SrsGb28181SipSession::get_device_info(std::string chid) { + SrsLocker(lock_list); if (_device_list.find(chid) != _device_list.end()){ return _device_list[chid]; } @@ -343,6 +351,7 @@ SrsGb28181Device* SrsGb28181SipSession::get_device_info(std::string chid) void SrsGb28181SipSession::dumps(SrsJsonObject* obj) { + SrsLocker(lock_list); obj->set("id", SrsJsonAny::str(_session_id.c_str())); obj->set("device_sumnum", SrsJsonAny::integer(_device_list.size())); @@ -379,7 +388,7 @@ SrsGb28181SipService::SrsGb28181SipService(SrsConfDirective* c) // TODO: FIXME: support reload. config = new SrsGb28181Config(c); sip = new SrsSipStack(); - + if (_srs_gb28181){ _srs_gb28181->set_sip_service(this); } diff --git a/trunk/src/app/srs_app_gb28181_sip.hpp b/trunk/src/app/srs_app_gb28181_sip.hpp index 10d180ee8..dafe75879 100644 --- a/trunk/src/app/srs_app_gb28181_sip.hpp +++ b/trunk/src/app/srs_app_gb28181_sip.hpp @@ -94,6 +94,8 @@ private: //std::map _device_status; int _sip_cseq; + srs_mutex_t lock_list; + public: SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipRequest* r); virtual ~SrsGb28181SipSession(); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 3f190418d..06adfaae0 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -42,6 +42,7 @@ using namespace std; #include #include + // set the max packet size. #define SRS_UDP_MAX_PACKET_SIZE 65535 @@ -116,6 +117,47 @@ srs_netfd_t SrsUdpListener::stfd() return lfd; } +void SrsUdpListener::set_socket_buffer() +{ + int default_sndbuf = 0; + // TODO: FIXME: Config it. + int expect_sndbuf = 1024*1024*10; // 10M + int actual_sndbuf = expect_sndbuf; + int r0_sndbuf = 0; + if (true) { + socklen_t opt_len = sizeof(default_sndbuf); + getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&default_sndbuf, &opt_len); + + if ((r0_sndbuf = setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, sizeof(actual_sndbuf))) < 0) { + srs_warn("set SO_SNDBUF failed, expect=%d, r0=%d", expect_sndbuf, r0_sndbuf); + } + + opt_len = sizeof(actual_sndbuf); + getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, &opt_len); + } + + int default_rcvbuf = 0; + // TODO: FIXME: Config it. + int expect_rcvbuf = 1024*1024*10; // 10M + int actual_rcvbuf = expect_rcvbuf; + int r0_rcvbuf = 0; + if (true) { + socklen_t opt_len = sizeof(default_rcvbuf); + getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&default_rcvbuf, &opt_len); + + if ((r0_rcvbuf = setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, sizeof(actual_rcvbuf))) < 0) { + srs_warn("set SO_RCVBUF failed, expect=%d, r0=%d", expect_rcvbuf, r0_rcvbuf); + } + + opt_len = sizeof(actual_rcvbuf); + getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, &opt_len); + } + + srs_trace("UDP #%d LISTEN at %s:%d, SO_SNDBUF(default=%d, expect=%d, actual=%d, r0=%d), SO_RCVBUF(default=%d, expect=%d, actual=%d, r0=%d)", + srs_netfd_fileno(lfd), ip.c_str(), port, default_sndbuf, expect_sndbuf, actual_sndbuf, r0_sndbuf, default_rcvbuf, expect_rcvbuf, actual_rcvbuf, r0_rcvbuf); +} + + srs_error_t SrsUdpListener::listen() { srs_error_t err = srs_success; @@ -124,6 +166,8 @@ srs_error_t SrsUdpListener::listen() return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } + set_socket_buffer(); + handler->set_stfd(lfd); srs_freep(trd); @@ -138,7 +182,7 @@ srs_error_t SrsUdpListener::listen() srs_error_t SrsUdpListener::cycle() { srs_error_t err = srs_success; - + while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "udp listener"); @@ -158,7 +202,7 @@ srs_error_t SrsUdpListener::cycle() && buf[19] == 0x63 && buf[20] == 0x6b) { continue; } - + if ((err = handler->on_udp_packet((const sockaddr*)&from, nb_from, buf, nread)) != srs_success) { return srs_error_wrap(err, "handle packet %d bytes", nread); } diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 672ed01b8..7a673f2b7 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -102,6 +102,8 @@ public: public: virtual int fd(); virtual srs_netfd_t stfd(); +private: + void set_socket_buffer(); public: virtual srs_error_t listen(); // Interface ISrsReusableThreadHandler. diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index a13cbae37..8046a1ad1 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1413,7 +1413,7 @@ srs_error_t SrsServer::listen_stream_caster() #ifdef SRS_GB28181 //init global gb28181 manger if (_srs_gb28181 == NULL){ - _srs_gb28181 = new SrsGb28181Manger(stream_caster); + _srs_gb28181 = new SrsGb28181Manger(this, stream_caster); if ((err = _srs_gb28181->initialize()) != srs_success){ return err; } diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 96b9826ef..c4a6a0df1 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 29 +#define SRS_VERSION4_REVISION 30 #endif diff --git a/trunk/src/protocol/srs_sip_stack.cpp b/trunk/src/protocol/srs_sip_stack.cpp index 65a383dfd..30235a159 100644 --- a/trunk/src/protocol/srs_sip_stack.cpp +++ b/trunk/src/protocol/srs_sip_stack.cpp @@ -637,7 +637,7 @@ srs_error_t SrsSipStack::do_parse_request(SrsSipRequest* req, const char* recv_m //map key:devicd_id value:status for(int i=0 ; i< (int)vec_device_id.size(); i++){ std::string status = ""; - if ((int)vec_device_id.size() > i) { + if ((int)vec_device_status.size() > i) { status = vec_device_status.at(i); }