1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

Refactor RTP sender

This commit is contained in:
winlin 2020-04-13 15:37:32 +08:00
parent 756826756a
commit 048301d9eb
2 changed files with 43 additions and 41 deletions

View file

@ -581,7 +581,7 @@ srs_error_t SrsRtcSenderThread::cycle()
}
int nn_rtp_pkts = 0;
if ((err = send_messages(source, msgs.msgs, msg_count, sendonly_ukt, &nn_rtp_pkts)) != srs_success) {
if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, &nn_rtp_pkts)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
}
@ -601,8 +601,7 @@ srs_error_t SrsRtcSenderThread::cycle()
}
srs_error_t SrsRtcSenderThread::send_messages(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs,
SrsUdpMuxSocket* skt, int* pnn_rtp_pkts
SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts
) {
srs_error_t err = srs_success;
@ -620,13 +619,12 @@ srs_error_t SrsRtcSenderThread::send_messages(
return err;
}
// Send out RTP packets
*pnn_rtp_pkts += (int)packets.size();
err = send_packets(skt, packets);
for (int j = 0; j < (int)packets.size(); j++) {
SrsRtpPacket2* packet = packets[j];
if ((err = send_packet(packet, skt)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
}
srs_freep(packet);
}
@ -695,10 +693,12 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
return err;
}
srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt)
srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, vector<SrsRtpPacket2*>& packets)
{
srs_error_t err = srs_success;
for (vector<SrsRtpPacket2*>::iterator it = packets.begin(); it != packets.end(); ++it) {
SrsRtpPacket2* packet = *it;
ISrsUdpSender* sender = skt->sender();
// Fetch a cached message from queue.
@ -713,7 +713,7 @@ srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket*
// Marshal packet to bytes.
if (true) {
SrsBuffer stream(buf, length);
if ((err = pkt->encode(&stream)) != srs_success) {
if ((err = packet->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
length = stream.pos();
@ -737,6 +737,8 @@ srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket*
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
}
}
return err;
}

View file

@ -152,9 +152,9 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn_rtp_pkts);
srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, std::vector<SrsRtpPacket2*>& packets);
srs_error_t send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt);
srs_error_t send_packets(SrsUdpMuxSocket* skt, std::vector<SrsRtpPacket2*>& packets);
private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
private: