1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00
This commit is contained in:
xiaozhihong 2020-04-13 14:29:19 +08:00
commit 8687c50dc4
34 changed files with 1582 additions and 594 deletions

View file

@ -388,16 +388,13 @@ srs_error_t SrsAudioRecode::initialize()
srs_error_t SrsAudioRecode::recode(SrsSample *pkt, char **buf, int *buf_len, int &n)
{
srs_error_t err = srs_success;
static char decode_buffer[kPacketBufMax];
static char resample_buffer[kFrameBufMax];
static char encode_buffer[kPacketBufMax];
if (!dec_) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "dec_ nullptr");
}
int decode_len = kPacketBufMax;
static char decode_buffer[kPacketBufMax];
if ((err = dec_->decode(pkt, decode_buffer, decode_len)) != srs_success) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "decode error");
}
@ -412,15 +409,18 @@ srs_error_t SrsAudioRecode::recode(SrsSample *pkt, char **buf, int *buf_len, int
if (!resample_) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "SrsAudioResample failed");
}
resample_->initialize();
if ((err = resample_->initialize()) != srs_success) {
return srs_error_wrap(err, "init resample");
}
}
SrsSample pcm;
pcm.bytes = decode_buffer;
pcm.size = decode_len;
int resample_len = kFrameBufMax;
static char resample_buffer[kFrameBufMax];
if ((err = resample_->resample(&pcm, resample_buffer, resample_len)) != srs_success) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "decode error");
return srs_error_new(ERROR_RTC_RTP_MUXER, "resample error");
}
n = 0;
@ -445,8 +445,9 @@ srs_error_t SrsAudioRecode::recode(SrsSample *pkt, char **buf, int *buf_len, int
int encode_len;
pcm.bytes = (char *)data_;
pcm.size = size_;
static char encode_buffer[kPacketBufMax];
if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "decode error");
return srs_error_new(ERROR_RTC_RTP_MUXER, "encode error");
}
memcpy(buf[n], encode_buffer, encode_len);

View file

@ -1540,6 +1540,11 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
if ((err = reload_http_stream(old_root)) != srs_success) {
return srs_error_wrap(err, "http steram");;
}
// Merge config: rtc_server
if ((err = reload_rtc_server(old_root)) != srs_success) {
return srs_error_wrap(err, "http steram");;
}
// TODO: FIXME: support reload stream_caster.
@ -1697,6 +1702,40 @@ srs_error_t SrsConfig::reload_http_stream(SrsConfDirective* old_root)
return err;
}
srs_error_t SrsConfig::reload_rtc_server(SrsConfDirective* old_root)
{
srs_error_t err = srs_success;
// merge config.
std::vector<ISrsReloadHandler*>::iterator it;
// state graph
// old_rtc_server new_rtc_server
// ENABLED => ENABLED (modified)
SrsConfDirective* new_rtc_server = root->get("rtc_server");
SrsConfDirective* old_rtc_server = old_root->get("rtc_server");
// TODO: FIXME: Support disable or enable reloading.
// ENABLED => ENABLED (modified)
if (get_rtc_server_enabled(old_rtc_server) && get_rtc_server_enabled(new_rtc_server)
&& !srs_directive_equals(old_rtc_server, new_rtc_server)
) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_rtc_server()) != srs_success) {
return srs_error_wrap(err, "rtc server enabled");
}
}
srs_trace("reload rtc server success.");
return err;
}
srs_trace("reload rtc server success, nothing changed.");
return err;
}
srs_error_t SrsConfig::reload_transcode(SrsConfDirective* new_vhost, SrsConfDirective* old_vhost)
{
srs_error_t err = srs_success;
@ -3575,7 +3614,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa"
&& n != "sendmmsg" && n != "encrypt") {
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str());
}
}
@ -4706,6 +4745,33 @@ int SrsConfig::get_rtc_server_sendmmsg()
#endif
}
int SrsConfig::get_rtc_server_reuseport()
{
#if defined(SO_REUSEPORT)
static int DEFAULT = 4;
#else
static int DEFAULT = 1;
#endif
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("reuseport");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
int reuseport = ::atoi(conf->arg0().c_str());
#if !defined(SO_REUSEPORT)
srs_warn("REUSEPORT not supported, reset %d to %d", reuseport, DEFAULT);
reuseport = DEFAULT
#endif
return reuseport;
}
SrsConfDirective* SrsConfig::get_rtc(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

View file

@ -333,6 +333,8 @@ private:
// Reload the http_stream section of config.
// TODO: FIXME: rename to http_server.
virtual srs_error_t reload_http_stream(SrsConfDirective* old_root);
// Reload the rtc_server section of config.
virtual srs_error_t reload_rtc_server(SrsConfDirective* old_root);
// Reload the transcode section of vhost of config.
virtual srs_error_t reload_transcode(SrsConfDirective* new_vhost, SrsConfDirective* old_vhost);
// Reload the ingest section of vhost of config.
@ -525,6 +527,7 @@ public:
virtual bool get_rtc_server_ecdsa();
virtual int get_rtc_server_sendmmsg();
virtual bool get_rtc_server_encrypt();
virtual int get_rtc_server_reuseport();
SrsConfDirective* get_rtc(std::string vhost);
bool get_rtc_enabled(std::string vhost);

View file

@ -40,6 +40,7 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
// set the max packet size.
#define SRS_UDP_MAX_PACKET_SIZE 65535
@ -235,12 +236,21 @@ srs_error_t SrsTcpListener::cycle()
return err;
}
SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd)
ISrsUdpSender::ISrsUdpSender()
{
}
ISrsUdpSender::~ISrsUdpSender()
{
}
SrsUdpMuxSocket::SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd)
{
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
nread = 0;
handler = h;
lfd = fd;
fromlen = 0;
@ -253,7 +263,7 @@ SrsUdpMuxSocket::~SrsUdpMuxSocket()
SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
{
SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(lfd);
SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd);
// Don't copy buffer
srs_freepa(sendonly->buf);
@ -339,16 +349,18 @@ std::string SrsUdpMuxSocket::get_peer_id()
return string(id_buf, len);
}
SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p)
SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p)
{
handler = h;
sender = s;
ip = i;
port = p;
lfd = NULL;
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
trd = new SrsDummyCoroutine();
}
@ -390,60 +402,108 @@ srs_error_t SrsUdpMuxListener::listen()
void SrsUdpMuxListener::set_socket_buffer()
{
int sndbuf_size = 0;
socklen_t opt_len = sizeof(sndbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len);
srs_trace("default udp remux socket sndbuf=%d", sndbuf_size);
int default_sndbuf = 0;
// TODO: FIXME: Config it.
int expect_sndbuf = 1024*1024*10; // 10M
int actual_sndbuf = expect_sndbuf;
int r0_sndbuf = 0;
if (true) {
socklen_t opt_len = sizeof(default_sndbuf);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&default_sndbuf, &opt_len);
sndbuf_size = 1024*1024*10; // 10M
if (setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, sizeof(sndbuf_size)) < 0) {
srs_warn("set sock opt SO_SNDBUFFORCE failed");
if ((r0_sndbuf = setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, sizeof(actual_sndbuf))) < 0) {
srs_warn("set SO_SNDBUF failed, expect=%d, r0=%d", expect_sndbuf, r0_sndbuf);
}
opt_len = sizeof(actual_sndbuf);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, &opt_len);
}
opt_len = sizeof(sndbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len);
srs_trace("udp remux socket sndbuf=%d", sndbuf_size);
int default_rcvbuf = 0;
// TODO: FIXME: Config it.
int expect_rcvbuf = 1024*1024*10; // 10M
int actual_rcvbuf = expect_rcvbuf;
int r0_rcvbuf = 0;
if (true) {
socklen_t opt_len = sizeof(default_rcvbuf);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&default_rcvbuf, &opt_len);
int rcvbuf_size = 0;
opt_len = sizeof(rcvbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len);
srs_trace("default udp remux socket rcvbuf=%d", rcvbuf_size);
if ((r0_rcvbuf = setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, sizeof(actual_rcvbuf))) < 0) {
srs_warn("set SO_RCVBUF failed, expect=%d, r0=%d", expect_rcvbuf, r0_rcvbuf);
}
rcvbuf_size = 1024*1024*10; // 10M
if (setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, sizeof(rcvbuf_size)) < 0) {
srs_warn("set sock opt SO_RCVBUFFORCE failed");
opt_len = sizeof(actual_rcvbuf);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, &opt_len);
}
opt_len = sizeof(rcvbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len);
srs_trace("udp remux socket rcvbuf=%d", rcvbuf_size);
srs_trace("UDP #%d LISTEN at %s:%d, SO_SNDBUF(default=%d, expect=%d, actual=%d, r0=%d), SO_RCVBUF(default=%d, expect=%d, actual=%d, r0=%d)",
srs_netfd_fileno(lfd), ip.c_str(), port, default_sndbuf, expect_sndbuf, actual_sndbuf, r0_sndbuf, default_rcvbuf, expect_rcvbuf, actual_rcvbuf, r0_rcvbuf);
}
srs_error_t SrsUdpMuxListener::cycle()
{
srs_error_t err = srs_success;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_recv(srs_netfd_fileno(lfd));
SrsAutoFree(SrsPithyPrint, pprint);
uint64_t nn_msgs = 0;
uint64_t nn_msgs_stage = 0;
uint64_t nn_msgs_last = 0;
uint64_t nn_loop = 0;
srs_utime_t time_last = srs_get_system_time();
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "udp listener");
}
}
SrsUdpMuxSocket udp_mux_skt(lfd);
nn_loop++;
int nread = udp_mux_skt.recvfrom(SRS_UTIME_NO_TIMEOUT);
SrsUdpMuxSocket skt(sender, lfd);
int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT);
if (nread <= 0) {
if (nread < 0) {
srs_warn("udp recv error");
}
// remux udp never return
continue;
}
}
nn_msgs++;
nn_msgs_stage++;
if ((err = handler->on_udp_packet(&udp_mux_skt)) != srs_success) {
if ((err = handler->on_udp_packet(&skt)) != srs_success) {
// remux udp never return
srs_warn("udp packet handler error:%s", srs_error_desc(err).c_str());
continue;
}
srs_error_reset(err);
}
pprint->elapse();
if (pprint->can_print()) {
int pps_average = 0; int pps_last = 0;
if (true) {
if (srs_get_system_time() > srs_get_system_startup_time()) {
pps_average = (int)(nn_msgs * SRS_UTIME_SECONDS / (srs_get_system_time() - srs_get_system_startup_time()));
}
if (srs_get_system_time() > time_last) {
pps_last = (int)((nn_msgs - nn_msgs_last) * SRS_UTIME_SECONDS / (srs_get_system_time() - time_last));
}
}
string pps_unit = "";
if (pps_last > 10000 || pps_average > 10000) {
pps_unit = "(w)"; pps_last /= 10000; pps_average /= 10000;
} else if (pps_last > 1000 || pps_average > 1000) {
pps_unit = "(k)"; pps_last /= 10000; pps_average /= 10000;
}
srs_trace("<- RTC #%d RECV %" PRId64 ", pps %d/%d%s, schedule %" PRId64,
srs_netfd_fileno(lfd), nn_msgs_stage, pps_average, pps_last, pps_unit.c_str(), nn_loop);
nn_msgs_last = nn_msgs; time_last = srs_get_system_time();
nn_loop = 0; nn_msgs_stage = 0;
}
if (SrsUdpPacketRecvCycleInterval > 0) {
srs_usleep(SrsUdpPacketRecvCycleInterval);

View file

@ -69,7 +69,7 @@ public:
virtual ~ISrsUdpMuxHandler();
public:
virtual srs_error_t on_stfd_change(srs_netfd_t fd);
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) = 0;
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt) = 0;
};
// The tcp connection handler.
@ -131,9 +131,22 @@ public:
virtual srs_error_t cycle();
};
class ISrsUdpSender
{
public:
ISrsUdpSender();
virtual ~ISrsUdpSender();
public:
// Fetch a mmsghdr from sender's cache.
virtual srs_error_t fetch(mmsghdr** pphdr) = 0;
// Notify the sender to send out the msg.
virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0;
};
class SrsUdpMuxSocket
{
private:
ISrsUdpSender* handler;
char* buf;
int nb_buf;
int nread;
@ -143,7 +156,7 @@ private:
std::string peer_ip;
int peer_port;
public:
SrsUdpMuxSocket(srs_netfd_t fd);
SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd);
virtual ~SrsUdpMuxSocket();
int recvfrom(srs_utime_t timeout);
@ -160,6 +173,7 @@ public:
std::string get_peer_id();
public:
SrsUdpMuxSocket* copy_sendonly();
ISrsUdpSender* sender() { return handler; };
private:
// Don't allow copy, user copy_sendonly instead
SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs);
@ -170,6 +184,7 @@ class SrsUdpMuxListener : public ISrsCoroutineHandler
{
protected:
srs_netfd_t lfd;
ISrsUdpSender* sender;
SrsCoroutine* trd;
protected:
char* buf;
@ -179,7 +194,7 @@ protected:
std::string ip;
int port;
public:
SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p);
SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p);
virtual ~SrsUdpMuxListener();
public:
virtual int fd();

View file

@ -112,6 +112,10 @@ SrsPithyPrint::SrsPithyPrint(int _stage_id)
#define SRS_CONSTS_STAGE_EXEC 11
// for the rtc play
#define SRS_CONSTS_STAGE_RTC_PLAY 12
// for the rtc send
#define SRS_CONSTS_STAGE_RTC_SEND 13
// for the rtc recv
#define SRS_CONSTS_STAGE_RTC_RECV 14
SrsPithyPrint* SrsPithyPrint::create_rtmp_play()
{
@ -173,6 +177,16 @@ SrsPithyPrint* SrsPithyPrint::create_rtc_play()
return new SrsPithyPrint(SRS_CONSTS_STAGE_RTC_PLAY);
}
SrsPithyPrint* SrsPithyPrint::create_rtc_send(int fd)
{
return new SrsPithyPrint(fd<<16 | SRS_CONSTS_STAGE_RTC_SEND);
}
SrsPithyPrint* SrsPithyPrint::create_rtc_recv(int fd)
{
return new SrsPithyPrint(fd<<16 | SRS_CONSTS_STAGE_RTC_RECV);
}
SrsPithyPrint::~SrsPithyPrint()
{
leave_stage();

View file

@ -88,6 +88,9 @@ public:
static SrsPithyPrint* create_http_stream();
static SrsPithyPrint* create_http_stream_cache();
static SrsPithyPrint* create_rtc_play();
// For RTC sender and receiver, we create printer for each fd.
static SrsPithyPrint* create_rtc_send(int fd);
static SrsPithyPrint* create_rtc_recv(int fd);
virtual ~SrsPithyPrint();
private:
// Enter the specified stage, return the client id.

View file

@ -115,6 +115,11 @@ srs_error_t ISrsReloadHandler::on_reload_http_stream_crossdomain()
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_rtc_server()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_vhost_http_updated()
{
return srs_success;

View file

@ -55,6 +55,7 @@ public:
virtual srs_error_t on_reload_http_stream_disabled();
virtual srs_error_t on_reload_http_stream_updated();
virtual srs_error_t on_reload_http_stream_crossdomain();
virtual srs_error_t on_reload_rtc_server();
public:
// TODO: FIXME: should rename to http_static
virtual srs_error_t on_reload_vhost_http_updated();

View file

@ -54,7 +54,7 @@ using namespace std;
#include <srs_app_audio_recode.hpp>
// TODO: Add this function into SrsRtpMux class.
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, SrsBuffer** stream_ptr)
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf)
{
srs_error_t err = srs_success;
@ -62,37 +62,33 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFor
return err;
}
if (stream_ptr == NULL) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "adts");
}
if (format->audio->nb_samples != 1) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "adts");
}
int nb_buf = format->audio->samples[0].size + 7;
char* buf = new char[nb_buf];
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsBuffer stream(buf, nb_buf);
// TODO: Add comment.
stream->write_1bytes(0xFF);
stream->write_1bytes(0xF9);
stream->write_1bytes(((format->acodec->aac_object - 1) << 6) | ((format->acodec->aac_sample_rate & 0x0F) << 2) | ((format->acodec->aac_channels & 0x04) >> 2));
stream->write_1bytes(((format->acodec->aac_channels & 0x03) << 6) | ((nb_buf >> 11) & 0x03));
stream->write_1bytes((nb_buf >> 3) & 0xFF);
stream->write_1bytes(((nb_buf & 0x07) << 5) | 0x1F);
stream->write_1bytes(0xFC);
stream.write_1bytes(0xFF);
stream.write_1bytes(0xF9);
stream.write_1bytes(((format->acodec->aac_object - 1) << 6) | ((format->acodec->aac_sample_rate & 0x0F) << 2) | ((format->acodec->aac_channels & 0x04) >> 2));
stream.write_1bytes(((format->acodec->aac_channels & 0x03) << 6) | ((nb_buf >> 11) & 0x03));
stream.write_1bytes((nb_buf >> 3) & 0xFF);
stream.write_1bytes(((nb_buf & 0x07) << 5) | 0x1F);
stream.write_1bytes(0xFC);
stream->write_bytes(format->audio->samples[0].bytes, format->audio->samples[0].size);
stream.write_bytes(format->audio->samples[0].bytes, format->audio->samples[0].size);
*stream_ptr = stream;
*pbuf = buf;
*pnn_buf = nb_buf;
return err;
}
SrsRtpH264Muxer::SrsRtpH264Muxer()
{
sequence = 0;
discard_bframe = false;
}
@ -100,269 +96,114 @@ SrsRtpH264Muxer::~SrsRtpH264Muxer()
{
}
srs_error_t SrsRtpH264Muxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsFormat* format)
srs_error_t SrsRtpH264Muxer::filter(SrsSharedPtrMessage* shared_frame, SrsFormat* format)
{
srs_error_t err = srs_success;
if (format->is_avc_sequence_header()) {
sps.assign(format->vcodec->sequenceParameterSetNALUnit.data(), format->vcodec->sequenceParameterSetNALUnit.size());
pps.assign(format->vcodec->pictureParameterSetNALUnit.data(), format->vcodec->pictureParameterSetNALUnit.size());
// only collect SPS/PPS.
// If IDR, we will insert SPS/PPS before IDR frame.
if (format->video && format->video->has_idr) {
shared_frame->set_has_idr(true);
}
// Update samples to shared frame.
for (int i = 0; i < format->video->nb_samples; ++i) {
SrsSample* sample = &format->video->samples[i];
// Because RTC does not support B-frame, so we will drop them.
// TODO: Drop B-frame in better way, which not cause picture corruption.
if (discard_bframe) {
if ((err = sample->parse_bframe()) != srs_success) {
return srs_error_wrap(err, "parse bframe");
}
if (sample->bframe) {
continue;
}
}
}
if (format->video->nb_samples <= 0) {
return err;
}
vector<SrsRtpSharedPacket*> rtp_packet_vec;
for (int i = 0; i < format->video->nb_samples; ++i) {
SrsSample sample = format->video->samples[i];
uint8_t header = sample.bytes[0];
uint8_t nal_type = header & kNalTypeMask;
// TODO: Use config to determine should check avc stream.
if (nal_type == SrsAvcNaluTypeNonIDR || nal_type == SrsAvcNaluTypeDataPartitionA || nal_type == SrsAvcNaluTypeIDR) {
SrsBuffer* stream = new SrsBuffer(sample.bytes, sample.size);
SrsAutoFree(SrsBuffer, stream);
// Skip nalu header.
stream->skip(1);
SrsBitBuffer bitstream(stream);
int32_t first_mb_in_slice = 0;
if ((err = srs_avc_nalu_read_uev(&bitstream, first_mb_in_slice)) != srs_success) {
return srs_error_wrap(err, "nalu read uev");
}
int32_t slice_type = 0;
if ((err = srs_avc_nalu_read_uev(&bitstream, slice_type)) != srs_success) {
return srs_error_wrap(err, "nalu read uev");
}
srs_verbose("nal_type=%d, slice type=%d", nal_type, slice_type);
if (slice_type == SrsAvcSliceTypeB || slice_type == SrsAvcSliceTypeB1) {
if (discard_bframe) {
continue;
}
}
}
if (sample.size <= kRtpMaxPayloadSize) {
if ((err = packet_single_nalu(shared_frame, format, &sample, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet single nalu");
}
} else {
if ((err = packet_fu_a(shared_frame, format, &sample, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet fu-a");
}
}
}
if (! rtp_packet_vec.empty()) {
// At the end of the frame, set marker bit.
// One frame may have multi nals. Set the marker bit in the last nal end, no the end of the nal.
if ((err = rtp_packet_vec.back()->modify_rtp_header_marker(true)) != srs_success) {
return srs_error_wrap(err, "set marker");
}
}
shared_frame->set_rtp_packets(rtp_packet_vec);
return err;
}
srs_error_t SrsRtpH264Muxer::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
char* p = sample->bytes + 1;
int nb_left = sample->size - 1;
uint8_t header = sample->bytes[0];
uint8_t nal_type = header & kNalTypeMask;
if (nal_type == SrsAvcNaluTypeIDR) {
if ((err = packet_stap_a(sps, pps, shared_frame, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
}
int num_of_packet = (sample->size - 1 + kRtpMaxPayloadSize) / kRtpMaxPayloadSize;
for (int i = 0; i < num_of_packet; ++i) {
char buf[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
int packet_size = min(nb_left, kRtpMaxPayloadSize);
// fu-indicate
uint8_t fu_indicate = kFuA;
fu_indicate |= (header & (~kNalTypeMask));
stream->write_1bytes(fu_indicate);
uint8_t fu_header = nal_type;
if (i == 0)
fu_header |= kStart;
if (i == num_of_packet - 1)
fu_header |= kEnd;
stream->write_1bytes(fu_header);
stream->write_bytes(p, packet_size);
p += packet_size;
nb_left -= packet_size;
srs_verbose("rtp fu-a nalu, size=%u, seq=%u, timestamp=%lu", sample->size, sequence, (shared_frame->timestamp * 90));
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
if ((err = rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos())) != srs_success) {
return srs_error_wrap(err, "rtp packet encode");
}
rtp_packet_vec.push_back(rtp_shared_pkt);
}
return err;
}
srs_error_t SrsRtpH264Muxer::packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
uint8_t header = sample->bytes[0];
uint8_t nal_type = header & kNalTypeMask;
if (nal_type == SrsAvcNaluTypeIDR) {
if ((err = packet_stap_a(sps, pps, shared_frame, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
}
srs_verbose("rtp single nalu, size=%u, seq=%u, timestamp=%lu", sample->size, sequence, (shared_frame->timestamp * 90));
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
if ((err = rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, sample->bytes, sample->size)) != srs_success) {
return srs_error_wrap(err, "rtp packet encode");
}
rtp_packet_vec.push_back(rtp_shared_pkt);
return err;
}
srs_error_t SrsRtpH264Muxer::packet_stap_a(const string &sps, const string& pps, SrsSharedPtrMessage* shared_frame, vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
if (sps.empty() || pps.empty()) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
}
uint8_t header = sps[0];
uint8_t nal_type = header & kNalTypeMask;
char buf[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
// stap-a header
uint8_t stap_a_header = kStapA;
stap_a_header |= (nal_type & (~kNalTypeMask));
stream->write_1bytes(stap_a_header);
stream->write_2bytes(sps.size());
stream->write_bytes((char*)sps.data(), sps.size());
stream->write_2bytes(pps.size());
stream->write_bytes((char*)pps.data(), pps.size());
srs_verbose("rtp stap-a nalu, size=%u, seq=%u, timestamp=%lu", (sps.size() + pps.size()), sequence, (shared_frame->timestamp * 90));
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
if ((err = rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos())) != srs_success) {
return srs_error_wrap(err, "rtp packet encode");
}
rtp_packet_vec.push_back(rtp_shared_pkt);
shared_frame->set_samples(format->video->samples, format->video->nb_samples);
return err;
}
SrsRtpOpusMuxer::SrsRtpOpusMuxer()
{
sequence = 0;
timestamp = 0;
transcode = NULL;
codec = NULL;
}
SrsRtpOpusMuxer::~SrsRtpOpusMuxer()
{
if (transcode) {
delete transcode;
transcode = NULL;
}
srs_freep(codec);
}
srs_error_t SrsRtpOpusMuxer::initialize()
{
srs_error_t err = srs_success;
transcode = new SrsAudioRecode(kChannel, kSamplerate);
if (!transcode) {
codec = new SrsAudioRecode(kChannel, kSamplerate);
if (!codec) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "SrsAacOpus init failed");
}
transcode->initialize();
if ((err = codec->initialize()) != srs_success) {
return srs_error_wrap(err, "init codec");
}
return err;
}
srs_error_t SrsRtpOpusMuxer::frame_to_packet(SrsSharedPtrMessage* shared_audio, SrsFormat* format, SrsBuffer* stream)
// An AAC packet may be transcoded to many OPUS packets.
const int kMaxOpusPackets = 8;
// The max size for each OPUS packet.
const int kMaxOpusPacketSize = 4096;
srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio)
{
srs_error_t err = srs_success;
vector<SrsRtpSharedPacket*> rtp_packet_vec;
// Opus packet cache.
static char* opus_payloads[kMaxOpusPackets];
char* data_ptr[kArrayLength];
static char data_array[kArrayLength][kArrayBuffer];
int elen[kArrayLength], number = 0;
static bool initialized = false;
if (!initialized) {
initialized = true;
data_ptr[0] = &data_array[0][0];
for (int i = 1; i < kArrayLength; i++) {
data_ptr[i] = data_array[i];
static char opus_packets_cache[kMaxOpusPackets][kMaxOpusPacketSize];
opus_payloads[0] = &opus_packets_cache[0][0];
for (int i = 1; i < kMaxOpusPackets; i++) {
opus_payloads[i] = opus_packets_cache[i];
}
}
SrsSample pkt;
pkt.bytes = stream->data();
pkt.size = stream->pos();
// Transcode an aac packet to many opus packets.
SrsSample aac;
aac.bytes = adts_audio;
aac.size = nn_adts_audio;
if ((err = transcode->recode(&pkt, data_ptr, elen, number)) != srs_success) {
int nn_opus_packets = 0;
int opus_sizes[kMaxOpusPackets];
if ((err = codec->recode(&aac, opus_payloads, opus_sizes, nn_opus_packets)) != srs_success) {
return srs_error_wrap(err, "recode error");
}
for (int i = 0; i < number; i++) {
SrsSample sample;
sample.size = elen[i];
sample.bytes = data_ptr[i];
packet_opus(shared_audio, &sample, rtp_packet_vec);
// Save OPUS packets in shared message.
if (nn_opus_packets <= 0) {
return err;
}
shared_audio->set_rtp_packets(rtp_packet_vec);
return err;
}
srs_error_t SrsRtpOpusMuxer::packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
rtp_shared_pkt->rtp_header.set_marker(true);
if ((err = rtp_shared_pkt->create(timestamp, sequence++, kAudioSSRC, kOpusPayloadType, sample->bytes, sample->size)) != srs_success) {
return srs_error_wrap(err, "rtp packet encode");
SrsSample samples[nn_opus_packets];
for (int i = 0; i < nn_opus_packets; i++) {
SrsSample* p = samples + i;
p->size = opus_sizes[i];
p->bytes = new char[p->size];
memcpy(p->bytes, opus_payloads[i], p->size);
}
// TODO: FIXME: Why 960? Need Refactoring?
timestamp += 960;
rtp_packet_vec.push_back(rtp_shared_pkt);
shared_audio->set_extra_payloads(samples, nn_opus_packets);
return err;
}
@ -485,17 +326,16 @@ srs_error_t SrsRtc::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* forma
// ignore sequence header
srs_assert(format->audio);
SrsBuffer* stream = NULL;
SrsAutoFree(SrsBuffer, stream);
if ((err = aac_raw_append_adts_header(shared_audio, format, &stream)) != srs_success) {
char* adts_audio = NULL;
int nn_adts_audio = 0;
// TODO: FIXME: Reserve 7 bytes header when create shared message.
if ((err = aac_raw_append_adts_header(shared_audio, format, &adts_audio, &nn_adts_audio)) != srs_success) {
return srs_error_wrap(err, "aac append header");
}
if (stream) {
char* stream_data = stream->data();
SrsAutoFreeA(char, stream_data);
return rtp_opus_muxer->frame_to_packet(shared_audio, format, stream);
if (adts_audio) {
err = rtp_opus_muxer->transcode(shared_audio, adts_audio, nn_adts_audio);
srs_freep(adts_audio);
}
return err;
@ -522,5 +362,5 @@ srs_error_t SrsRtc::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* forma
// ignore info frame,
// @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909
srs_assert(format->video);
return rtp_h264_muxer->frame_to_packet(shared_video, format);
return rtp_h264_muxer->filter(shared_video, format);
}

View file

@ -39,30 +39,15 @@ class SrsOriginHub;
class SrsAudioRecode;
class SrsBuffer;
// Rtp packet max payload size, not include rtp header.
// Must left some bytes to payload header, rtp header, udp header, ip header.
const int kRtpMaxPayloadSize = 1200;
// The RTP packet max size, should never exceed this size.
const int kRtpPacketSize = 1500;
// Payload type will rewrite in srs_app_rtc_conn.cpp when send to client.
const uint8_t kOpusPayloadType = 111;
const uint8_t kH264PayloadType = 102;
// H.264 nalu header type mask.
const uint8_t kNalTypeMask = 0x1F;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kStapA = 24;
const uint8_t kFuA = 28;
// @see: https://tools.ietf.org/html/rfc6184#section-5.8
const uint8_t kStart = 0x80; // Fu-header start bit
const uint8_t kEnd = 0x40; // Fu-header end bit
const int kChannel = 2;
const int kSamplerate = 48000;
const int kArrayLength = 8;
const int kArrayBuffer = 4096;
// SSRC will rewrite in srs_app_rtc_conn.cpp when send to client.
const uint32_t kAudioSSRC = 1;
@ -71,39 +56,26 @@ const uint32_t kVideoSSRC = 2;
// TODO: Define interface class like ISrsRtpMuxer
class SrsRtpH264Muxer
{
private:
uint16_t sequence;
std::string sps;
std::string pps;
public:
bool discard_bframe;
public:
SrsRtpH264Muxer();
virtual ~SrsRtpH264Muxer();
public:
srs_error_t frame_to_packet(SrsSharedPtrMessage* shared_video, SrsFormat* format);
private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
srs_error_t packet_stap_a(const std::string &sps, const std::string& pps, SrsSharedPtrMessage* shared_frame, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
srs_error_t filter(SrsSharedPtrMessage* shared_video, SrsFormat* format);
};
// TODO: FIXME: It's not a muxer, but a transcoder.
class SrsRtpOpusMuxer
{
private:
// TODO: FIXME: How to handle timestamp overflow?
uint32_t timestamp;
uint16_t sequence;
SrsAudioRecode* transcode;
SrsAudioRecode* codec;
public:
SrsRtpOpusMuxer();
virtual ~SrsRtpOpusMuxer();
virtual srs_error_t initialize();
public:
srs_error_t frame_to_packet(SrsSharedPtrMessage* shared_audio, SrsFormat* format, SrsBuffer* stream);
private:
srs_error_t packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
srs_error_t transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio);
};
class SrsRtc

File diff suppressed because it is too large Load diff

View file

@ -32,6 +32,7 @@
#include <srs_app_hybrid.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_sdp.hpp>
#include <srs_app_reload.hpp>
#include <string>
#include <map>
@ -48,6 +49,7 @@ class SrsRtcServer;
class SrsRtcSession;
class SrsSharedPtrMessage;
class SrsSource;
class SrsRtpPacket2;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -98,16 +100,17 @@ public:
srs_error_t initialize(const SrsRequest& req);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls(SrsUdpMuxSocket* skt);
srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* skt);
srs_error_t on_dtls_application_data(const char* data, const int len);
public:
srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt);
srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
private:
srs_error_t handshake(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t handshake(SrsUdpMuxSocket* skt);
private:
srs_error_t srtp_initialize();
srs_error_t srtp_send_init();
@ -125,6 +128,12 @@ private:
uint32_t audio_ssrc;
uint16_t video_payload_type;
uint16_t audio_payload_type;
private:
// TODO: FIXME: How to handle timestamp overflow?
uint32_t audio_timestamp;
uint16_t audio_sequence;
private:
uint16_t video_sequence;
public:
SrsUdpMuxSocket* sendonly_ukt;
public:
@ -139,11 +148,18 @@ public:
virtual void stop();
virtual void stop_loop();
public:
virtual srs_error_t cycle();
void update_sendonly_socket(SrsUdpMuxSocket* skt);
public:
void update_sendonly_socket(SrsUdpMuxSocket* ukt);
virtual srs_error_t cycle();
private:
void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt);
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts);
srs_error_t send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt);
private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacket2*>& packets);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket);
};
class SrsRtcSession
@ -194,40 +210,73 @@ public:
void switch_to_context();
public:
srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req);
srs_error_t on_dtls(SrsUdpMuxSocket* skt);
srs_error_t on_rtcp(SrsUdpMuxSocket* skt);
public:
srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_connection_established(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t send_client_hello(SrsUdpMuxSocket* skt);
srs_error_t on_connection_established(SrsUdpMuxSocket* skt);
srs_error_t start_play(SrsUdpMuxSocket* skt);
public:
bool is_stun_timeout();
private:
srs_error_t check_source();
private:
srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req);
srs_error_t on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req);
private:
<<<<<<< HEAD
srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
// Internal only.
public:
void send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length);
=======
srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
>>>>>>> upstream/feature/rtc
};
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, virtual public ISrsCoroutineHandler
class SrsUdpMuxSender : virtual public ISrsUdpSender, virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
{
private:
SrsUdpMuxListener* listener;
SrsHourGlass* timer;
private:
srs_netfd_t lfd;
SrsRtcServer* server;
SrsCoroutine* trd;
private:
srs_cond_t cond;
bool waiting_msgs;
// TODO: FIXME: Support multiple stfd.
srs_netfd_t mmstfd;
std::vector<mmsghdr> mmhdrs;
// Hotspot msgs, we are working on it.
// @remark We will wait util all messages are ready.
std::vector<mmsghdr> hotspot;
// Cache msgs, for other coroutines to fill it.
std::vector<mmsghdr> cache;
int cache_pos;
// The max number of messages for sendmmsg. If 1, we use sendmsg to send.
int max_sendmmsg;
public:
SrsUdpMuxSender(SrsRtcServer* s);
virtual ~SrsUdpMuxSender();
public:
virtual srs_error_t initialize(srs_netfd_t fd);
private:
void free_mhdrs(std::vector<mmsghdr>& mhdrs);
public:
virtual srs_error_t fetch(mmsghdr** pphdr);
virtual srs_error_t sendmmsg(mmsghdr* hdr);
virtual srs_error_t cycle();
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_rtc_server();
};
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass
{
private:
SrsHourGlass* timer;
std::vector<SrsUdpMuxListener*> listeners;
std::vector<SrsUdpMuxSender*> senders;
private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
@ -240,27 +289,31 @@ public:
// TODO: FIXME: Support gracefully quit.
// TODO: FIXME: Support reload.
virtual srs_error_t listen_udp();
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt);
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt);
public:
virtual srs_error_t listen_api();
SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip);
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session);
void check_and_clean_timeout_session();
int nn_sessions() { return (int)map_username_session.size(); }
private:
srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_stun(SrsUdpMuxSocket* skt);
srs_error_t on_dtls(SrsUdpMuxSocket* skt);
srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* skt);
private:
SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag);
SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
<<<<<<< HEAD
// Internal only.
public:
void send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length);
void free_messages(std::vector<mmsghdr>& hdrs);
virtual srs_error_t cycle();
=======
>>>>>>> upstream/feature/rtc
};
// The RTC server adapter.

View file

@ -78,6 +78,14 @@ srs_error_t parse_h264_fmtp(const std::string& fmtp, H264SpecificParam& h264_par
if (kv[0] == "profile-level-id") {
h264_param.profile_level_id = kv[1];
} else if (kv[0] == "packetization-mode") {
// 6.3. Non-Interleaved Mode
// This mode is in use when the value of the OPTIONAL packetization-mode
// media type parameter is equal to 1. This mode SHOULD be supported.
// It is primarily intended for low-delay applications. Only single NAL
// unit packets, STAP-As, and FU-As MAY be used in this mode. STAP-Bs,
// MTAPs, and FU-Bs MUST NOT be used. The transmission order of NAL
// units MUST comply with the NAL unit decoding order.
// @see https://tools.ietf.org/html/rfc6184#section-6.3
h264_param.packetization_mode = kv[1];
} else if (kv[0] == "level-asymmetry-allowed") {
h264_param.level_asymmerty_allow = kv[1];

View file

@ -1143,7 +1143,8 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
// TODO: FIXME: Refactor to move to rtp?
// Save the RTP packets for find_rtp_packet() to rtx or restore it.
source->rtp_queue->push(msg->rtp_packets);
// TODO: FIXME: Remove dead code.
//source->rtp_queue->push(msg->rtp_packets);
#endif
if ((err = hls->on_video(msg, format)) != srs_success) {
@ -2718,4 +2719,9 @@ SrsRtpSharedPacket* SrsSource::find_rtp_packet(const uint16_t& seq)
{
return rtp_queue->find(seq);
}
SrsMetaCache* SrsSource::cached_meta()
{
return meta;
}
#endif

View file

@ -327,6 +327,7 @@ public:
#ifdef SRS_AUTO_RTC
// To find the RTP packet for RTX or restore.
// TODO: FIXME: Should queue RTP packets in connection level.
class SrsRtpPacketQueue
{
private:
@ -634,6 +635,8 @@ public:
#ifdef SRS_AUTO_RTC
// Find rtp packet by sequence
SrsRtpSharedPacket* find_rtp_packet(const uint16_t& seq);
// Get the cached meta, as such the sps/pps.
SrsMetaCache* cached_meta();
#endif
};