1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refactor code for merge_nalus and gso

This commit is contained in:
winlin 2020-04-13 16:50:24 +08:00
parent 048301d9eb
commit 4400896395
5 changed files with 149 additions and 33 deletions

View file

@ -449,6 +449,25 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed");
}
SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus)
{
is_gso = gso;
should_merge_nalus = merge_nalus;
nn_rtp_pkts = nn_samples = 0;
nn_audios = nn_videos = 0;
}
SrsRtcPackets::~SrsRtcPackets()
{
vector<SrsRtpPacket2*>::iterator it;
for (it = packets.begin(); it != packets.end(); ++it ) {
SrsRtpPacket2* packet = *it;
srs_freep(packet);
}
packets.clear();
}
SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid)
: sendonly_ukt(NULL)
{
@ -457,15 +476,21 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int
rtc_session = s;
sendonly_ukt = u->copy_sendonly();
gso = false;
merge_nalus = false;
audio_timestamp = 0;
audio_sequence = 0;
video_sequence = 0;
_srs_config->subscribe(this);
}
SrsRtcSenderThread::~SrsRtcSenderThread()
{
_srs_config->unsubscribe(this);
srs_freep(trd);
srs_freep(sendonly_ukt);
}
@ -480,9 +505,35 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t
video_payload_type = v_pt;
audio_payload_type = a_pt;
gso = _srs_config->get_rtc_server_gso();
merge_nalus = _srs_config->get_rtc_server_merge_nalus();
srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d)",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus);
return err;
}
srs_error_t SrsRtcSenderThread::on_reload_rtc_server()
{
if (true) {
bool v = _srs_config->get_rtc_server_gso();
if (gso != v) {
srs_trace("Reload gso %d=>%d", gso, v);
gso = v;
}
}
if (true) {
bool v = _srs_config->get_rtc_server_merge_nalus();
if (merge_nalus != v) {
srs_trace("Reload merge_nalus %d=>%d", merge_nalus, v);
merge_nalus = v;
}
}
return srs_success;
}
int SrsRtcSenderThread::cid()
{
return trd->cid();
@ -580,28 +631,27 @@ srs_error_t SrsRtcSenderThread::cycle()
continue;
}
int nn_rtp_pkts = 0;
if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, &nn_rtp_pkts)) != srs_success) {
SrsRtcPackets pkts(gso, merge_nalus);
if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
}
int nn = 0;
for (int i = 0; i < msg_count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
nn += msg->size;
srs_freep(msg);
}
pprint->elapse();
if (pprint->can_print()) {
// TODO: FIXME: Print stat like frame/s, packet/s, loss_packets.
srs_trace("-> RTC PLAY %d msgs, %d packets, %d bytes", msg_count, nn_rtp_pkts, nn);
srs_trace("-> RTC PLAY %d msgs, %d samples, %d packets, %d audios, %d videos, %d bytes",
msg_count, pkts.nn_samples, pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_videos, pkts.nn_bytes);
}
}
}
srs_error_t SrsRtcSenderThread::send_messages(
SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts
SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) {
srs_error_t err = srs_success;
@ -610,53 +660,53 @@ srs_error_t SrsRtcSenderThread::send_messages(
}
// Covert kernel messages to RTP packets.
vector<SrsRtpPacket2*> packets;
if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) {
for (int j = 0; j < (int)packets.size(); j++) {
SrsRtpPacket2* packet = packets[j];
srs_freep(packet);
}
return err;
}
// Send out RTP packets
*pnn_rtp_pkts += (int)packets.size();
err = send_packets(skt, packets);
packets.nn_rtp_pkts = (int)packets.packets.size();
for (int j = 0; j < (int)packets.size(); j++) {
SrsRtpPacket2* packet = packets[j];
srs_freep(packet);
// Send out RTP packets
if ((err = send_packets(skt, packets)) != srs_success) {
return err;
}
return err;
}
srs_error_t SrsRtcSenderThread::messages_to_packets(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, vector<SrsRtpPacket2*>& packets
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) {
srs_error_t err = srs_success;
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
packets.nn_bytes += msg->size;
packets.nn_samples += msg->nn_extra_payloads() + msg->nn_samples();
SrsRtpPacket2* packet = NULL;
if (msg->is_audio()) {
packets.nn_audios++;
for (int i = 0; i < msg->nn_extra_payloads(); i++) {
SrsSample* sample = msg->extra_payloads() + i;
if ((err = packet_opus(sample, &packet)) != srs_success) {
return srs_error_wrap(err, "opus package");
}
packets.push_back(packet);
packets.packets.push_back(packet);
}
continue;
}
packets.nn_videos++;
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (msg->has_idr()) {
if ((err = packet_stap_a(source, msg, &packet)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
packets.push_back(packet);
packets.packets.push_back(packet);
}
for (int i = 0; i < msg->nn_samples(); i++) {
@ -677,14 +727,14 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
if (i == msg->nn_samples() - 1) {
packet->rtp_header.set_marker(true);
}
packets.push_back(packet);
packets.packets.push_back(packet);
} else {
if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) {
return srs_error_wrap(err, "packet fu-a");
}
if (i == msg->nn_samples() - 1) {
packets.back()->rtp_header.set_marker(true);
packets.packets.back()->rtp_header.set_marker(true);
}
}
}
@ -693,11 +743,12 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
return err;
}
srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, vector<SrsRtpPacket2*>& packets)
srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
for (vector<SrsRtpPacket2*>::iterator it = packets.begin(); it != packets.end(); ++it) {
vector<SrsRtpPacket2*>::iterator it;
for (it = packets.packets.begin(); it != packets.packets.end(); ++it) {
SrsRtpPacket2* packet = *it;
ISrsUdpSender* sender = skt->sender();
@ -766,7 +817,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p
return err;
}
srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacket2*>& packets)
srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -780,7 +831,7 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample*
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* packet = new SrsRtpPacket2();
packets.push_back(packet);
packets.packets.push_back(packet);
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
@ -1513,8 +1564,8 @@ srs_error_t SrsUdpMuxSender::on_reload_rtc_server()
{
int v = _srs_config->get_rtc_server_sendmmsg();
if (max_sendmmsg != v) {
srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v);
max_sendmmsg = v;
srs_trace("Reload max_sendmmsg=%d", max_sendmmsg);
}
return srs_success;