1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #1638, #307, merge john, add NACK suport, remove debug code, verbose log

This commit is contained in:
winlin 2020-03-14 22:51:35 +08:00
commit 60c8b37f05
10 changed files with 421 additions and 134 deletions

View file

@ -31,6 +31,7 @@ using namespace std;
#include <srs_rtmp_stack.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_rtp.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_rtp.hpp>
#include <srs_app_forward.hpp>
@ -326,6 +327,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
return err;
}
srs_error_t SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
@ -815,6 +817,55 @@ SrsSharedPtrMessage* SrsMixQueue::pop()
return msg;
}
SrsRtpPacketQueue::SrsRtpPacketQueue()
{
}
SrsRtpPacketQueue::~SrsRtpPacketQueue()
{
clear();
}
void SrsRtpPacketQueue::clear()
{
map<uint16_t, SrsRtpSharedPacket*>::iterator iter = pkt_queue.begin();
while (iter != pkt_queue.end()) {
srs_freep(iter->second);
pkt_queue.erase(iter++);
}
}
void SrsRtpPacketQueue::push(std::vector<SrsRtpSharedPacket*>& pkts)
{
for (int i = 0; i < pkts.size(); ++i) {
insert(pkts[i]->sequence, pkts[i]);
}
}
void SrsRtpPacketQueue::insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt)
{
pkt_queue.insert(make_pair(sequence, pkt->copy()));
if (pkt_queue.size() >= 3000) {
srs_freep(pkt_queue.begin()->second);
pkt_queue.erase(pkt_queue.begin());
}
}
SrsRtpSharedPacket* SrsRtpPacketQueue::find(const uint16_t& sequence)
{
if (pkt_queue.empty()) {
return NULL;
}
SrsRtpSharedPacket* pkt = NULL;
map<uint16_t, SrsRtpSharedPacket*>::iterator iter = pkt_queue.find(sequence);
if (iter != pkt_queue.end()) {
pkt = iter->second->copy();
}
return pkt;
}
SrsOriginHub::SrsOriginHub()
{
source = NULL;
@ -1076,6 +1127,8 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
srs_error_reset(err);
rtp->on_unpublish();
}
source->rtp_queue->push(msg->rtp_packets);
if ((err = hls->on_video(msg, format)) != srs_success) {
// apply the error strategy for hls.
@ -1728,7 +1781,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) {
srs_trace("found source");
*pps = source;
return err;
}
@ -1848,6 +1900,7 @@ SrsSource::SrsSource()
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
rtp_queue = new SrsRtpPacketQueue();
_can_publish = true;
_pre_source_id = _source_id = -1;
@ -1877,6 +1930,7 @@ SrsSource::~SrsSource()
srs_freep(hub);
srs_freep(meta);
srs_freep(mix_queue);
srs_freep(rtp_queue);
srs_freep(play_edge);
srs_freep(publish_edge);
@ -2634,3 +2688,7 @@ string SrsSource::get_curr_origin()
return play_edge->get_curr_origin();
}
SrsRtpSharedPacket* SrsSource::find_rtp_packet(const uint16_t& seq)
{
return rtp_queue->find(seq);
}