1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

merge upstream feature/rtc, solve conflict

This commit is contained in:
xiaozhihong 2020-04-19 15:02:21 +08:00
commit 749503a12e
28 changed files with 1173 additions and 439 deletions

View file

@ -3618,7 +3618,8 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa"
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus") {
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus"
&& n != "padding" && n != "perf_stat" && n != "queue_length") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str());
}
}
@ -3784,7 +3785,8 @@ srs_error_t SrsConfig::check_normal_config()
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "time_jitter" && m != "mix_correct" && m != "atc" && m != "atc_auto" && m != "mw_latency"
&& m != "gop_cache" && m != "queue_length" && m != "send_min_interval" && m != "reduce_sequence_header") {
&& m != "gop_cache" && m != "queue_length" && m != "send_min_interval" && m != "reduce_sequence_header"
&& m != "mw_msgs") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.play.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
@ -4840,6 +4842,57 @@ bool SrsConfig::get_rtc_server_gso2()
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
int SrsConfig::get_rtc_server_padding()
{
static int DEFAULT = 127;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("padding");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return srs_min(127, ::atoi(conf->arg0().c_str()));
}
bool SrsConfig::get_rtc_server_perf_stat()
{
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("perf_stat");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
int SrsConfig::get_rtc_server_queue_length()
{
static int DEFAULT = 2000;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("queue_length");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
SrsConfDirective* SrsConfig::get_rtc(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
@ -5367,8 +5420,48 @@ srs_utime_t SrsConfig::get_mw_sleep(string vhost, bool is_rtc)
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
int v = ::atoi(conf->arg0().c_str());
if (is_rtc && v > 0) {
srs_warn("For RTC, we ignore mw_latency");
return 0;
}
return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS);
return (srs_utime_t)(v * SRS_UTIME_MILLISECONDS);
}
int SrsConfig::get_mw_msgs(string vhost, bool is_realtime, bool is_rtc)
{
int DEFAULT = SRS_PERF_MW_MIN_MSGS;
if (is_rtc) {
DEFAULT = SRS_PERF_MW_MIN_MSGS_FOR_RTC;
}
if (is_realtime) {
DEFAULT = SRS_PERF_MW_MIN_MSGS_REALTIME;
}
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("play");
if (!conf) {
return DEFAULT;
}
conf = conf->get("mw_msgs");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
int v = ::atoi(conf->arg0().c_str());
if (v > SRS_PERF_MW_MSGS) {
srs_warn("reset mw_msgs %d to max %d", v, SRS_PERF_MW_MSGS);
v = SRS_PERF_MW_MSGS;
}
return v;
}
bool SrsConfig::get_realtime_enabled(string vhost, bool is_rtc)

View file

@ -535,6 +535,11 @@ public:
virtual bool get_rtc_server_gso();
private:
virtual bool get_rtc_server_gso2();
public:
virtual int get_rtc_server_padding();
virtual bool get_rtc_server_perf_stat();
virtual int get_rtc_server_queue_length();
public:
SrsConfDirective* get_rtc(std::string vhost);
bool get_rtc_enabled(std::string vhost);
@ -625,6 +630,10 @@ public:
// @param vhost, the vhost to get the mw sleep time.
// TODO: FIXME: add utest for mw config.
virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false);
// Get the mw_msgs, mw wait time in packets for vhost.
// @param vhost, the vhost to get the mw sleep msgs.
// TODO: FIXME: add utest for mw config.
virtual int get_mw_msgs(std::string vhost, bool is_realtime, bool is_rtc = false);
// Whether min latency mode enabled.
// @param vhost, the vhost to get the min_latency.
// TODO: FIXME: add utest for min_latency.

View file

@ -869,6 +869,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
api = prop->to_str();
}
// TODO: FIXME: Parse vhost.
// Parse app and stream from streamurl.
string app;
string stream_name;
@ -909,6 +910,13 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
request.app = app;
request.stream = stream_name;
// TODO: FIXME: Parse vhost.
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost("");
if (parsed_vhost) {
request.vhost = parsed_vhost->arg0();
}
// TODO: FIXME: Maybe need a better name?
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, eip);
@ -1615,14 +1623,23 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
SrsStatistic* stat = SrsStatistic::instance();
string target = r->query_get("target");
srs_trace("query target=%s", target.c_str());
string reset = r->query_get("reset");
srs_trace("query target=%s, reset=%s, rtc_stat_enabled=%d", target.c_str(), reset.c_str(),
_srs_config->get_rtc_server_perf_stat());
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
data->set("query", p);
p->set("target", SrsJsonAny::str(target.c_str()));
p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg"));
p->set("reset", SrsJsonAny::str(reset.c_str()));
p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg|bytes|dropped"));
p->set("help2", SrsJsonAny::str("?reset=all"));
}
if (!reset.empty()) {
stat->reset_perf();
return srs_api_response(w, r, obj->dumps());
}
if (target.empty() || target == "avframes") {
@ -1679,6 +1696,24 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
}
}
if (target.empty() || target == "bytes") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("bytes", p);
if ((err = stat->dumps_perf_bytes(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
if (target.empty() || target == "dropped") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("dropped", p);
if ((err = stat->dumps_perf_dropped(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
return srs_api_response(w, r, obj->dumps());
}

View file

@ -660,7 +660,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
return srs_error_wrap(err, "consumer dump packets");
}
// TODO: FIXME: Support merged-write wait.
if (count <= 0) {
// Directly use sleep, donot use consumer wait, because we couldn't awake consumer.
srs_usleep(mw_sleep);

View file

@ -141,6 +141,11 @@ public:
virtual srs_error_t fetch(mmsghdr** pphdr) = 0;
// Notify the sender to send out the msg.
virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0;
// Whether sender exceed the max queue, that is, overflow.
virtual bool overflow() = 0;
// Set the queue extra ratio, for example, when mw_msgs > 0, we need larger queue.
// For r, 100 means x1, 200 means x2.
virtual void set_extra_ratio(int r) = 0;
};
class SrsUdpMuxSocket

View file

@ -195,15 +195,19 @@ srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char*
return err;
}
int nn_max_extra_payload = 0;
SrsSample samples[nn_opus_packets];
for (int i = 0; i < nn_opus_packets; i++) {
SrsSample* p = samples + i;
p->size = opus_sizes[i];
p->bytes = new char[p->size];
memcpy(p->bytes, opus_payloads[i], p->size);
nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size);
}
shared_audio->set_extra_payloads(samples, nn_opus_packets);
shared_audio->set_max_extra_payload(nn_max_extra_payload);
return err;
}

File diff suppressed because it is too large Load diff

View file

@ -50,6 +50,7 @@ class SrsRtcSession;
class SrsSharedPtrMessage;
class SrsSource;
class SrsRtpPacket2;
class ISrsUdpSender;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -124,8 +125,18 @@ public:
bool use_gso;
bool should_merge_nalus;
public:
// The total bytes of RTP packets.
#if defined(SRS_DEBUG)
// Debug id.
uint32_t debug_id;
#endif
public:
// The total bytes of AVFrame packets.
int nn_bytes;
// The total bytes of RTP packets.
int nn_rtp_bytes;
// The total padded bytes.
int nn_padding_bytes;
public:
// The RTP packets send out by sendmmsg or sendmsg. Note that if many packets group to
// one msghdr by GSO, it's only one RTP packet, because we only send once.
int nn_rtp_pkts;
@ -138,11 +149,24 @@ public:
int nn_audios;
// The original video messages.
int nn_videos;
// The number of padded packet.
int nn_paddings;
// The number of dropped messages.
int nn_dropped;
private:
int cursor;
int nn_cache;
SrsRtpPacket2* cache;
public:
std::vector<SrsRtpPacket2*> packets;
public:
SrsRtcPackets(bool gso, bool merge_nalus);
SrsRtcPackets(int nn_cache_max);
virtual ~SrsRtcPackets();
public:
void reset(bool gso, bool merge_nalus);
SrsRtpPacket2* fetch();
SrsRtpPacket2* back();
int size();
int capacity();
SrsRtpPacket2* at(int index);
};
class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
@ -164,8 +188,16 @@ private:
uint16_t video_sequence;
public:
SrsUdpMuxSocket* sendonly_ukt;
private:
ISrsUdpSender* sender;
private:
bool merge_nalus;
bool gso;
int max_padding;
private:
srs_utime_t mw_sleep;
int mw_msgs;
bool realtime;
public:
SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid);
virtual ~SrsRtcSenderThread();
@ -174,6 +206,8 @@ public:
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_rtc_server();
virtual srs_error_t on_reload_vhost_play(std::string vhost);
virtual srs_error_t on_reload_vhost_realtime(std::string vhost);
public:
virtual int cid();
public:
@ -185,17 +219,17 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
srs_error_t send_packets(SrsRtcPackets& packets);
srs_error_t send_packets_gso(SrsRtcPackets& packets);
private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload);
private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets);
srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
};
class SrsRtcSession
@ -274,6 +308,9 @@ private:
private:
srs_cond_t cond;
bool waiting_msgs;
bool gso;
int nn_senders;
private:
// Hotspot msgs, we are working on it.
// @remark We will wait util all messages are ready.
std::vector<mmsghdr> hotspot;
@ -282,16 +319,24 @@ private:
int cache_pos;
// The max number of messages for sendmmsg. If 1, we use sendmsg to send.
int max_sendmmsg;
// The total queue length, for each sender.
int queue_length;
// The extra queue ratio.
int extra_ratio;
int extra_queue;
public:
SrsUdpMuxSender(SrsRtcServer* s);
virtual ~SrsUdpMuxSender();
public:
virtual srs_error_t initialize(srs_netfd_t fd);
virtual srs_error_t initialize(srs_netfd_t fd, int senders);
private:
void free_mhdrs(std::vector<mmsghdr>& mhdrs);
public:
virtual srs_error_t fetch(mmsghdr** pphdr);
virtual srs_error_t sendmmsg(mmsghdr* hdr);
virtual bool overflow();
virtual void set_extra_ratio(int r);
public:
virtual srs_error_t cycle();
// interface ISrsReloadHandler
public:

View file

@ -116,7 +116,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnect
wakable = NULL;
mw_sleep = SRS_PERF_MW_SLEEP;
mw_enabled = false;
mw_msgs = 0;
realtime = SRS_PERF_MIN_LATENCY_ENABLED;
send_min_interval = 0;
tcp_nodelay = false;
@ -264,6 +264,10 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost)
send_min_interval = v;
}
}
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
return err;
}
@ -298,6 +302,10 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost)
srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
realtime = realtime_enabled;
}
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
return err;
}
@ -689,18 +697,19 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
// setup the realtime.
realtime = _srs_config->get_realtime_enabled(req->vhost);
// setup the mw config.
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep(req->vhost));
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
set_socket_buffer(mw_sleep);
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
while (true) {
// when source is set to expired, disconnect it.
@ -730,13 +739,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// wait for message to incoming.
// @see https://github.com/ossrs/srs/issues/251
// @see https://github.com/ossrs/srs/issues/257
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(SRS_PERF_MW_MIN_MSGS_REALTIME, mw_sleep);
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
}
consumer->wait(mw_msgs, mw_sleep);
#endif
// get messages from consumer.
@ -750,9 +753,9 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// reportable
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep));
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);
}
if (count <= 0) {
@ -1114,16 +1117,6 @@ srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsComm
return err;
}
void SrsRtmpConn::change_mw_sleep(srs_utime_t sleep_v)
{
if (!mw_enabled) {
return;
}
set_socket_buffer(sleep_v);
mw_sleep = sleep_v;
}
void SrsRtmpConn::set_sock_options()
{
SrsRequest* req = info->req;

View file

@ -102,8 +102,7 @@ private:
srs_utime_t duration;
// The MR(merged-write) sleep time in srs_utime_t.
srs_utime_t mw_sleep;
// The MR(merged-write) only enabled for play.
int mw_enabled;
int mw_msgs;
// For realtime
// @see https://github.com/ossrs/srs/issues/257
bool realtime;
@ -149,7 +148,6 @@ private:
virtual srs_error_t handle_publish_message(SrsSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
virtual void change_mw_sleep(srs_utime_t sleep_v);
virtual void set_sock_options();
private:
virtual srs_error_t check_edge_token_traverse_auth();

View file

@ -269,9 +269,17 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
max_queue_size = queue_size;
}
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp)
{
srs_error_t err = srs_success;
msgs.push_back(msg);
// For RTC, we never care about the timestamp and duration, so we never shrink queue here,
// but we will drop messages in each consumer coroutine.
if (pass_timestamp) {
return err;
}
if (msg->is_av()) {
if (av_start_time == -1) {
@ -281,8 +289,6 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
msgs.push_back(msg);
while (av_end_time - av_start_time > max_queue_size) {
// notice the caller queue already overflow and shrinked.
if (is_overflow) {
@ -295,7 +301,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
return err;
}
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp)
{
srs_error_t err = srs_success;
@ -308,13 +314,15 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
count = srs_min(max_count, nb_msgs);
SrsSharedPtrMessage** omsgs = msgs.data();
for (int i = 0; i < count; i++) {
pmsgs[i] = omsgs[i];
memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));
// For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration,
// so we do not have to update the start time here.
if (!pass_timestamp) {
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
}
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
if (count >= nb_msgs) {
// the pmsgs is big enough and clear msgs at most time.
msgs.clear();
@ -433,6 +441,8 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
mw_duration = 0;
mw_waiting = false;
#endif
pass_timestamp = false;
}
SrsConsumer::~SrsConsumer()
@ -466,20 +476,35 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
srs_error_t err = srs_success;
SrsSharedPtrMessage* msg = shared_msg->copy();
if (!atc) {
// For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of
// timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here.
if (!pass_timestamp && !atc) {
if ((err = jitter->correct(msg, ag)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
// Put message in queue, here we may enable pass_timestamp mode.
if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
// For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue,
// so we only check the messages in queue.
if (pass_timestamp) {
if (queue->size() > mw_min_msgs) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
return err;
}
// For RTMP, we wait for messages and duration.
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
@ -529,7 +554,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
}
// pump msgs from queue.
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}

View file

@ -151,12 +151,13 @@ public:
// Enqueue the message, the timestamp always monotonically.
// @param msg, the msg to enqueue, user never free it whatever the return code.
// @param is_overflow, whether overflow and shrinked. NULL to ignore.
virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
// @remark If pass_timestamp, we never shrink and never care about the timestamp or duration.
virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL, bool pass_timestamp = false);
// Get packets in consumer queue.
// @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
// @count the count in array, output param.
// @max_count the max count to dequeue, must be positive.
virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp = false);
// Dumps packets to consumer, use specified args.
// @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue().
virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag);
@ -203,10 +204,17 @@ private:
int mw_min_msgs;
srs_utime_t mw_duration;
#endif
private:
// For RTC, we never use jitter to correct timestamp.
// But we should not change the atc or time_jitter for source or RTMP.
// @remark In this mode, we also never check the queue by timstamp, but only by count.
bool pass_timestamp;
public:
SrsConsumer(SrsSource* s, SrsConnection* c);
virtual ~SrsConsumer();
public:
// Use pass timestamp mode.
void enable_pass_timestamp() { pass_timestamp = true; }
// Set the size of queue.
virtual void set_queue_size(srs_utime_t queue_size);
// when source id changed, notice client to print.

View file

@ -271,6 +271,8 @@ SrsStatistic::SrsStatistic()
perf_gso = new SrsStatisticCategory();
perf_rtp = new SrsStatisticCategory();
perf_rtc = new SrsStatisticCategory();
perf_bytes = new SrsStatisticCategory();
perf_dropped = new SrsStatisticCategory();
}
SrsStatistic::~SrsStatistic()
@ -311,6 +313,8 @@ SrsStatistic::~SrsStatistic()
srs_freep(perf_gso);
srs_freep(perf_rtp);
srs_freep(perf_rtc);
srs_freep(perf_bytes);
srs_freep(perf_dropped);
}
SrsStatistic* SrsStatistic::instance()
@ -641,7 +645,7 @@ srs_error_t SrsStatistic::dumps_perf_writev_iovs(SrsJsonObject* obj)
return dumps_perf(perf_iovs, obj);
}
void SrsStatistic::perf_sendmmsg_on_packets(int nb_packets)
void SrsStatistic::perf_on_sendmmsg_packets(int nb_packets)
{
perf_on_packets(perf_sendmmsg, nb_packets);
}
@ -651,6 +655,73 @@ srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj)
return dumps_perf(perf_sendmmsg, obj);
}
void SrsStatistic::perf_on_rtc_bytes(int nn_bytes, int nn_rtp_bytes, int nn_padding)
{
// a: AVFrame bytes.
// b: RTC bytes.
// c: RTC paddings.
perf_bytes->a += nn_bytes;
perf_bytes->b += nn_rtp_bytes;
perf_bytes->c += nn_padding;
perf_bytes->nn += nn_rtp_bytes;
}
srs_error_t SrsStatistic::dumps_perf_bytes(SrsJsonObject* obj)
{
obj->set("avframe_bytes", SrsJsonAny::integer(perf_bytes->a));
obj->set("rtc_bytes", SrsJsonAny::integer(perf_bytes->b));
obj->set("rtc_padding", SrsJsonAny::integer(perf_bytes->c));
obj->set("nn", SrsJsonAny::integer(perf_bytes->nn));
return srs_success;
}
void SrsStatistic::perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped)
{
// a: System AVFrames.
// b: RTC frames.
// c: Dropped frames.
perf_dropped->a += nn_msgs;
perf_dropped->b += nn_rtc;
perf_dropped->c += nn_dropped;
perf_dropped->nn += nn_dropped;
}
srs_error_t SrsStatistic::dumps_perf_dropped(SrsJsonObject* obj)
{
obj->set("avframes", SrsJsonAny::integer(perf_dropped->a));
obj->set("rtc_frames", SrsJsonAny::integer(perf_dropped->b));
obj->set("rtc_dropeed", SrsJsonAny::integer(perf_dropped->c));
obj->set("nn", SrsJsonAny::integer(perf_dropped->nn));
return srs_success;
}
void SrsStatistic::reset_perf()
{
srs_freep(perf_iovs);
srs_freep(perf_msgs);
srs_freep(perf_sendmmsg);
srs_freep(perf_gso);
srs_freep(perf_rtp);
srs_freep(perf_rtc);
srs_freep(perf_bytes);
srs_freep(perf_dropped);
perf_iovs = new SrsStatisticCategory();
perf_msgs = new SrsStatisticCategory();
perf_sendmmsg = new SrsStatisticCategory();
perf_gso = new SrsStatisticCategory();
perf_rtp = new SrsStatisticCategory();
perf_rtc = new SrsStatisticCategory();
perf_bytes = new SrsStatisticCategory();
perf_dropped = new SrsStatisticCategory();
}
void SrsStatistic::perf_on_packets(SrsStatisticCategory* p, int nb_msgs)
{
// The range for stat:

View file

@ -174,6 +174,8 @@ private:
SrsStatisticCategory* perf_gso;
SrsStatisticCategory* perf_rtp;
SrsStatisticCategory* perf_rtc;
SrsStatisticCategory* perf_bytes;
SrsStatisticCategory* perf_dropped;
private:
SrsStatistic();
virtual ~SrsStatistic();
@ -245,13 +247,11 @@ public:
// Stat for packets merged written, nb_packets is the number of RTP packets.
// For example, a RTC/opus packet maybe package to three RTP packets.
virtual void perf_on_rtp_packets(int nb_packets);
// Dumps the perf statistic data for RTP packets, for performance analysis.
virtual srs_error_t dumps_perf_rtp_packets(SrsJsonObject* obj);
public:
// Stat for packets UDP GSO, nb_packets is the merged RTP packets.
// For example, three RTP/audio packets maybe GSO to one msghdr.
virtual void perf_on_gso_packets(int nb_packets);
// Dumps the perf statistic data for UDP GSO, for performance analysis.
virtual srs_error_t dumps_perf_gso(SrsJsonObject* obj);
public:
// Stat for TCP writev, nb_iovs is the total number of iovec.
@ -259,9 +259,19 @@ public:
virtual srs_error_t dumps_perf_writev_iovs(SrsJsonObject* obj);
public:
// Stat for packets UDP sendmmsg, nb_packets is the vlen for sendmmsg.
virtual void perf_sendmmsg_on_packets(int nb_packets);
// Dumps the perf statistic data for UDP sendmmsg, for performance analysis.
virtual void perf_on_sendmmsg_packets(int nb_packets);
virtual srs_error_t dumps_perf_sendmmsg(SrsJsonObject* obj);
public:
// Stat for bytes, nn_bytes is the size of bytes, nb_padding is padding bytes.
virtual void perf_on_rtc_bytes(int nn_bytes, int nn_rtp_bytes, int nn_padding);
virtual srs_error_t dumps_perf_bytes(SrsJsonObject* obj);
public:
// Stat for rtc messages, nn_rtc is rtc messages, nn_dropped is dropped messages.
virtual void perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped);
virtual srs_error_t dumps_perf_dropped(SrsJsonObject* obj);
public:
// Reset all perf stat data.
virtual void reset_perf();
private:
virtual void perf_on_packets(SrsStatisticCategory* p, int nb_msgs);
virtual srs_error_t dumps_perf(SrsStatisticCategory* p, SrsJsonObject* obj);