1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SquashSRS4: Remove object cache and stat api

This commit is contained in:
winlin 2021-05-14 18:17:42 +08:00
parent f711eb79ed
commit 6a980683f7
44 changed files with 141 additions and 1277 deletions

View file

@ -178,7 +178,7 @@ SrsRtcConsumer::~SrsRtcConsumer()
vector<SrsRtpPacket2*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
SrsRtpPacket2* pkt = *it;
_srs_rtp_cache->recycle(pkt);
srs_freep(pkt);
}
srs_cond_destroy(mw_wait);
@ -661,19 +661,6 @@ srs_error_t SrsRtcStream::on_timer(srs_utime_t interval)
return err;
}
SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper()
{
pkt = _srs_rtp_cache->allocate();
// We MUST reset the packet, when got from cache.
pkt->reset();
}
SrsRtpPacketCacheHelper::~SrsRtpPacketCacheHelper()
{
_srs_rtp_cache->recycle(pkt);
}
#ifdef SRS_FFMPEG_FIT
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
{
@ -844,45 +831,46 @@ srs_error_t SrsRtcFromRtmpBridger::on_audio(SrsSharedPtrMessage* msg)
return err;
}
srs_error_t SrsRtcFromRtmpBridger::transcode(SrsAudioFrame* pkt)
srs_error_t SrsRtcFromRtmpBridger::transcode(SrsAudioFrame* audio)
{
srs_error_t err = srs_success;
std::vector<SrsAudioFrame *> out_pkts;
if ((err = codec_->transcode(pkt, out_pkts)) != srs_success) {
std::vector<SrsAudioFrame *> out_audios;
if ((err = codec_->transcode(audio, out_audios)) != srs_success) {
return srs_error_wrap(err, "recode error");
}
// Save OPUS packets in shared message.
if (out_pkts.empty()) {
if (out_audios.empty()) {
return err;
}
for (std::vector<SrsAudioFrame *>::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) {
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
SrsAutoFree(SrsRtpPacketCacheHelper, helper);
for (std::vector<SrsAudioFrame*>::iterator it = out_audios.begin(); it != out_audios.end(); ++it) {
SrsAudioFrame* out_audio = *it;
if ((err = package_opus(*it, helper)) != srs_success) {
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsAutoFree(SrsRtpPacket2, pkt);
if ((err = package_opus(out_audio, pkt)) != srs_success) {
err = srs_error_wrap(err, "package opus");
break;
}
if ((err = source_->on_rtp(helper->pkt)) != srs_success) {
if ((err = source_->on_rtp(pkt)) != srs_success) {
err = srs_error_wrap(err, "consume opus");
break;
}
}
codec_->free_frames(out_pkts);
codec_->free_frames(out_audios);
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_opus(SrsAudioFrame* audio, SrsRtpPacketCacheHelper* helper)
srs_error_t SrsRtcFromRtmpBridger::package_opus(SrsAudioFrame* audio, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kAudioPayloadType);
pkt->header.set_ssrc(audio_ssrc);
pkt->frame_type = SrsFrameTypeAudio;
@ -890,7 +878,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(SrsAudioFrame* audio, SrsRtpPack
pkt->header.set_sequence(audio_sequence++);
pkt->header.set_timestamp(audio->dts * 48);
SrsRtpRawPayload* raw = _srs_rtp_raw_cache->allocate();
SrsRtpRawPayload* raw = new SrsRtpRawPayload();
pkt->set_payload(raw, SrsRtpPacketPayloadTypeRaw);
srs_assert(audio->nb_samples == 1);
@ -923,22 +911,22 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg)
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (has_idr) {
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
SrsAutoFree(SrsRtpPacketCacheHelper, helper);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsAutoFree(SrsRtpPacket2, pkt);
if ((err = package_stap_a(source_, msg, helper)) != srs_success) {
if ((err = package_stap_a(source_, msg, pkt)) != srs_success) {
return srs_error_wrap(err, "package stap-a");
}
if ((err = source_->on_rtp(helper->pkt)) != srs_success) {
if ((err = source_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "consume sps/pps");
}
}
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
vector<SrsRtpPacketCacheHelper*> helpers;
vector<SrsRtpPacket2*> pkts;
if (merge_nalus && nn_samples > 1) {
if ((err = package_nalus(msg, samples, helpers)) != srs_success) {
if ((err = package_nalus(msg, samples, pkts)) != srs_success) {
return srs_error_wrap(err, "package nalus as one");
}
} else {
@ -953,22 +941,22 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg)
}
if (sample->size <= kRtpMaxPayloadSize) {
if ((err = package_single_nalu(msg, sample, helpers)) != srs_success) {
if ((err = package_single_nalu(msg, sample, pkts)) != srs_success) {
return srs_error_wrap(err, "package single nalu");
}
} else {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, helpers)) != srs_success) {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, pkts)) != srs_success) {
return srs_error_wrap(err, "package fu-a");
}
}
}
}
if (!helpers.empty()) {
helpers.back()->pkt->header.set_marker(true);
if (!pkts.empty()) {
pkts.back()->header.set_marker(true);
}
return consume_packets(helpers);
return consume_packets(pkts);
}
srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, vector<SrsSample*>& samples)
@ -1001,7 +989,7 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* f
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacketCacheHelper* helper)
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
@ -1017,7 +1005,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
}
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1061,7 +1048,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const vector<SrsSample*>& samples, vector<SrsRtpPacketCacheHelper*>& helpers)
srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const vector<SrsSample*>& samples, vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
@ -1097,10 +1084,9 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
if (nn_bytes < kRtpMaxPayloadSize) {
// Package NALUs in a single RTP packet.
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkts.push_back(pkt);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1132,10 +1118,9 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes);
}
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkts.push_back(pkt);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1159,21 +1144,20 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
}
// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6
srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector<SrsRtpPacketCacheHelper*>& helpers)
srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkts.push_back(pkt);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
pkt->header.set_sequence(video_sequence++);
pkt->header.set_timestamp(msg->timestamp * 90);
SrsRtpRawPayload* raw = _srs_rtp_raw_cache->allocate();
SrsRtpRawPayload* raw = new SrsRtpRawPayload();
pkt->set_payload(raw, SrsRtpPacketPayloadTypeRaw);
raw->payload = sample->bytes;
@ -1184,7 +1168,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg,
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacketCacheHelper*>& helpers)
srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
@ -1197,17 +1181,16 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam
for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkts.push_back(pkt);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
pkt->header.set_sequence(video_sequence++);
pkt->header.set_timestamp(msg->timestamp * 90);
SrsRtpFUAPayload2* fua = _srs_rtp_fua_cache->allocate();
SrsRtpFUAPayload2* fua = new SrsRtpFUAPayload2();
pkt->set_payload(fua, SrsRtpPacketPayloadTypeFUA2);
fua->nri = (SrsAvcNaluType)header;
@ -1227,22 +1210,22 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam
return err;
}
srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacketCacheHelper*>& helpers)
srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
// TODO: FIXME: Consume a range of packets.
for (int i = 0; i < (int)helpers.size(); i++) {
SrsRtpPacketCacheHelper* helper = helpers[i];
if ((err = source_->on_rtp(helper->pkt)) != srs_success) {
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
if ((err = source_->on_rtp(pkt)) != srs_success) {
err = srs_error_wrap(err, "consume sps/pps");
break;
}
}
for (int i = 0; i < (int)helpers.size(); i++) {
SrsRtpPacketCacheHelper* helper = helpers[i];
srs_freep(helper);
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
srs_freep(pkt);
}
return err;
@ -2192,29 +2175,10 @@ SrsRtcTrackDescription* SrsRtcStreamDescription::find_track_description_by_ssrc(
return NULL;
}
SrsRtcTrackStatistic::SrsRtcTrackStatistic()
{
packets = 0;
last_packets = 0;
bytes = 0;
last_bytes = 0;
nacks = 0;
last_nacks = 0;
padding_packets = 0;
last_padding_packets = 0;
padding_bytes = 0;
last_padding_bytes = 0;
replay_packets = 0;
last_replay_packets = 0;
replay_bytes = 0;
last_replay_bytes = 0;
}
SrsRtcRecvTrack::SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio)
{
session_ = session;
track_desc_ = track_desc->copy();
statistic_ = new SrsRtcTrackStatistic();
nack_no_copy_ = false;
if (is_audio) {
@ -2233,7 +2197,6 @@ SrsRtcRecvTrack::~SrsRtcRecvTrack()
srs_freep(rtp_queue_);
srs_freep(nack_receiver_);
srs_freep(track_desc_);
srs_freep(statistic_);
}
bool SrsRtcRecvTrack::has_ssrc(uint32_t ssrc)
@ -2341,7 +2304,6 @@ srs_error_t SrsRtcRecvTrack::do_check_send_nacks(uint32_t& timeout_nacks)
uint32_t sent_nacks = 0;
session_->check_send_nacks(nack_receiver_, track_desc_->ssrc_, sent_nacks, timeout_nacks);
statistic_->nacks += sent_nacks;
return err;
}
@ -2362,7 +2324,7 @@ void SrsRtcAudioRecvTrack::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffe
return;
}
*ppayload = _srs_rtp_raw_cache->allocate();
*ppayload = new SrsRtpRawPayload();
*ppt = SrsRtpPacketPayloadTypeRaw;
}
@ -2370,13 +2332,6 @@ srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk
{
srs_error_t err = srs_success;
// connection level statistic
session_->stat_->nn_in_audios++;
// track level statistic
statistic_->packets++;
statistic_->bytes += pkt->nb_bytes();
if ((err = source->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "source on rtp");
}
@ -2421,10 +2376,10 @@ void SrsRtcVideoRecvTrack::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffe
*ppayload = new SrsRtpSTAPPayload();
*ppt = SrsRtpPacketPayloadTypeSTAP;
} else if (v == kFuA) {
*ppayload = _srs_rtp_fua_cache->allocate();
*ppayload = new SrsRtpFUAPayload2();
*ppt = SrsRtpPacketPayloadTypeFUA2;
} else {
*ppayload = _srs_rtp_raw_cache->allocate();
*ppayload = new SrsRtpRawPayload();
*ppt = SrsRtpPacketPayloadTypeRaw;
}
}
@ -2433,13 +2388,6 @@ srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk
{
srs_error_t err = srs_success;
// connection level statistic
session_->stat_->nn_in_videos++;
// track level statistic
statistic_->packets++;
statistic_->bytes += pkt->nb_bytes();
pkt->frame_type = SrsFrameTypeVideo;
if ((err = source->on_rtp(pkt)) != srs_success) {
@ -2475,7 +2423,6 @@ SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescripti
{
session_ = session;
track_desc_ = track_desc->copy();
statistic_ = new SrsRtcTrackStatistic();
nack_no_copy_ = false;
if (is_audio) {
@ -2491,7 +2438,6 @@ SrsRtcSendTrack::~SrsRtcSendTrack()
{
srs_freep(rtp_queue_);
srs_freep(track_desc_);
srs_freep(statistic_);
srs_freep(nack_epp);
}
@ -2568,10 +2514,6 @@ srs_error_t SrsRtcSendTrack::on_recv_nack(const vector<uint16_t>& lost_seqs)
++_srs_pps_rnack2->sugar;
SrsRtcTrackStatistic* statistic = statistic_;
statistic->nacks++;
for(int i = 0; i < (int)lost_seqs.size(); ++i) {
uint16_t seq = lost_seqs.at(i);
SrsRtpPacket2* pkt = fetch_rtp_packet(seq);
@ -2624,14 +2566,6 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt)
// TODO: FIXME: Should update PT for RTX.
}
// Update stats.
session_->stat_->nn_out_audios++;
// track level statistic
// TODO: FIXME: if send packets failed, statistic is no correct.
statistic_->packets++;
statistic_->bytes += pkt->nb_bytes();
if ((err = session_->do_send_packet(pkt)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
@ -2662,8 +2596,6 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt)
if (!track_desc_->is_active_) {
return err;
}
SrsRtcTrackStatistic* statistic = statistic_;
pkt->header.set_ssrc(track_desc_->ssrc_);
@ -2678,14 +2610,6 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt)
// TODO: FIXME: Should update PT for RTX.
}
// Update stats.
session_->stat_->nn_out_videos++;
// track level statistic
// TODO: FIXME: if send packets failed, statistic is no correct.
statistic->packets++;
statistic->bytes += pkt->nb_bytes();
if ((err = session_->do_send_packet(pkt)) != srs_success) {
return srs_error_wrap(err, "raw send");
}