1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

refine pithy print to more easyer to use 2.0.121.

This commit is contained in:
winlin 2015-02-19 18:56:21 +08:00
parent 9d233db27e
commit 1102c7a58f
23 changed files with 242 additions and 332 deletions

View file

@ -16,10 +16,11 @@ Download from ossrs.net:
SRS(SIMPLE RTMP Server) over state-threads created in 2013.10.
SRS delivers rtmp/hls live on x86/x64/arm/mips linux,
SRS delivers rtmp/hls/http live on x86/x64/arm/mips linux,
supports origin/edge/vhost and transcode/ingest and dvr/forward
and http-api/http-callback/reload, introduces tracable
session-oriented log, exports client srs-librtmp,
with stream caster to push MPEGTS-over-UDP/RTSP to SRS,
provides EN/CN wiki and the most simple architecture.
SRS focus on small problem domain, which is the most complex for all software(see OOAD).
@ -529,6 +530,7 @@ Supported operating systems and hardware:
### SRS 2.0 history
* v2.0, 2015-02-19, refine pithy print to more easyer to use 2.0.121.
* v2.0, 2015-02-18, fix [#133](https://github.com/winlinvip/simple-rtmp-server/issues/133), support push rtsp to srs. 2.0.120.
* v2.0, 2015-02-17, the join maybe failed, should use a variable to ensure thread terminated. 2.0.119.
* v2.0, 2015-02-15, for [#304](https://github.com/winlinvip/simple-rtmp-server/issues/304), support config default acodec/vcodec. 2.0.118.

View file

@ -1227,26 +1227,4 @@ vhost removed.srs.com {
# config for the pithy print,
# which always print constant message specified by interval,
# whatever the clients in concurrency.
pithy_print {
# shared print interval for all publish clients, in milliseconds.
# default: 10000
publish 10000;
# shared print interval for all play clients, in milliseconds.
# default: 10000
play 10000;
# shared print interval for all forwarders, in milliseconds.
# default: 10000
forwarder 10000;
# shared print interval for all encoders, in milliseconds.
# default: 10000
encoder 10000;
# shared print interval for all ingesters, in milliseconds.
# default: 10000
ingester 10000;
# shared print interval for all hls, in milliseconds.
# default: 10000
hls 10000;
# shared print interval for all edge, in milliseconds.
# default: 10000
edge 10000;
}
pithy_print_ms 10000;

View file

@ -502,16 +502,16 @@ int SrsConfig::reload_conf(SrsConfig* conf)
srs_trace("reload srs_log_file success.");
}
// merge config: pithy_print
if (!srs_directive_equals(root->get("pithy_print"), old_root->get("pithy_print"))) {
// merge config: pithy_print_ms
if (!srs_directive_equals(root->get("pithy_print_ms"), old_root->get("pithy_print_ms"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_pithy_print()) != ERROR_SUCCESS) {
srs_error("notify subscribes pithy_print listen failed. ret=%d", ret);
srs_error("notify subscribes pithy_print_ms listen failed. ret=%d", ret);
return ret;
}
}
srs_trace("reload pithy_print success.");
srs_trace("reload pithy_print_ms success.");
}
// merge config: http_api
@ -1322,7 +1322,7 @@ int SrsConfig::check_config()
if (n != "listen" && n != "pid" && n != "chunk_size" && n != "ff_log_dir"
&& n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
&& n != "max_connections" && n != "daemon" && n != "heartbeat"
&& n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print"
&& n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
&& n != "http_stream" && n != "http_server" && n != "stream_caster")
{
ret = ERROR_SYSTEM_CONFIG_INVALID;
@ -1376,19 +1376,6 @@ int SrsConfig::check_config()
}
}
}
if (true) {
SrsConfDirective* conf = get_pithy_print();
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "publish" && n != "play" && n != "forwarder"
&& n != "encoder" && n != "ingester" && n != "hls" && n != "edge"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported pithy_print directive %s, ret=%d", n.c_str(), ret);
return ret;
}
}
}
for (int n = 0; n < (int)stream_casters.size(); n++) {
SrsConfDirective* stream_caster = stream_casters[n];
for (int i = 0; stream_caster && i < (int)stream_caster->directives.size(); i++) {
@ -1884,111 +1871,11 @@ string SrsConfig::get_pid_file()
return conf->arg0();
}
SrsConfDirective* SrsConfig::get_pithy_print()
int SrsConfig::get_pithy_print_ms()
{
return root->get("pithy_print");
}
int SrsConfig::get_pithy_print_publish()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_PUBLISH_USER_INTERVAL_MS;
}
pithy = pithy->get("publish");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_PUBLISH_USER_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_forwarder()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_FORWARDER_INTERVAL_MS;
}
pithy = pithy->get("forwarder");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_FORWARDER_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_encoder()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_ENCODER_INTERVAL_MS;
}
pithy = pithy->get("encoder");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_ENCODER_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_ingester()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_INGESTER_INTERVAL_MS;
}
pithy = pithy->get("ingester");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_INGESTER_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_hls()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_HLS_INTERVAL_MS;
}
pithy = pithy->get("hls");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_HLS_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_play()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_PLAY_USER_INTERVAL_MS;
}
pithy = pithy->get("play");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_PLAY_USER_INTERVAL_MS;
}
return ::atoi(pithy->arg0().c_str());
}
int SrsConfig::get_pithy_print_edge()
{
SrsConfDirective* pithy = get_pithy_print();
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_EDGE_INTERVAL_MS;
}
pithy = pithy->get("edge");
if (!pithy) {
return SRS_CONF_DEFAULT_STAGE_EDGE_INTERVAL_MS;
SrsConfDirective* pithy = root->get("pithy_print_ms");
if (!pithy || pithy->arg0().empty()) {
return SRS_CONF_DEFAULT_PITHY_PRINT_MS;
}
return ::atoi(pithy->arg0().c_str());

View file

@ -91,13 +91,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_STATS_NETWORK_DEVICE_INDEX 0
#define SRS_CONF_DEFAULT_STAGE_PLAY_USER_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_STAGE_PUBLISH_USER_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_STAGE_FORWARDER_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_STAGE_ENCODER_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_STAGE_INGESTER_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_STAGE_HLS_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_STAGE_EDGE_INTERVAL_MS 10000
#define SRS_CONF_DEFAULT_PITHY_PRINT_MS 10000
#define SRS_CONF_DEFAULT_INGEST_TYPE_FILE "file"
#define SRS_CONF_DEFAULT_INGEST_TYPE_STREAM "stream"
@ -419,46 +413,12 @@ public:
* user can use different pid file for each process.
*/
virtual std::string get_pid_file();
// pithy print
private:
virtual SrsConfDirective* get_pithy_print();
public:
/**
* get the pithy print interval for publish, in ms,
* the publish(flash/FMLE) message print.
* get pithy print pulse ms,
* for example, all rtmp connections only print one message
* every this interval in ms.
*/
virtual int get_pithy_print_publish();
/**
* get the pithy print interval for forwarder, in ms,
* the forwarder message print, for SRS forward stream to other servers.
*/
virtual int get_pithy_print_forwarder();
/**
* get the pithy print interval for encoder, in ms,
* the encoder message print, for FFMPEG transcoder.
*/
virtual int get_pithy_print_encoder();
/**
* get the pithy print interval for ingester, in ms,
* the ingest used FFMPEG, or your tools, to read and transcode other stream
* to RTMP to SRS.
*/
virtual int get_pithy_print_ingester();
/**
* get the pithy print interval for HLS, in ms,
* the HLS used for IOS/android/PC, SRS will mux RTMP to HLS.
*/
virtual int get_pithy_print_hls();
/**
* get the pithy print interval for Play, in ms,
* the play is client or edge playing RTMP stream
*/
virtual int get_pithy_print_play();
/**
* get the pithy print interval for edge, in ms,
* the edge will get stream from upnode.
*/
virtual int get_pithy_print_edge();
virtual int get_pithy_print_ms();
// stream_caster section
public:
/**

View file

@ -167,17 +167,18 @@ int SrsEdgeIngester::ingest()
client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_EDGE);
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
while (pthread->can_loop()) {
pithy_print.elapse();
pprint->elapse();
// pithy print
if (pithy_print.can_print()) {
if (pprint->can_print()) {
kbps->sample();
srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(),
pprint->age(),
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
}
@ -473,7 +474,8 @@ int SrsEdgeForwarder::cycle()
client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_EDGE);
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
@ -506,14 +508,14 @@ int SrsEdgeForwarder::cycle()
return ret;
}
pithy_print.elapse();
pprint->elapse();
// pithy print
if (pithy_print.can_print()) {
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> "SRS_CONSTS_LOG_EDGE_PUBLISH
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(), count,
pprint->age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
}

View file

@ -45,7 +45,7 @@ static std::vector<std::string> _transcoded_url;
SrsEncoder::SrsEncoder()
{
pthread = new SrsThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US, true);
pithy_print = new SrsPithyPrint(SRS_CONSTS_STAGE_ENCODER);
pprint = SrsPithyPrint::create_encoder();
}
SrsEncoder::~SrsEncoder()
@ -53,7 +53,7 @@ SrsEncoder::~SrsEncoder()
on_unpublish();
srs_freep(pthread);
srs_freep(pithy_print);
srs_freep(pprint);
}
int SrsEncoder::on_publish(SrsRequest* req)
@ -113,8 +113,7 @@ int SrsEncoder::cycle()
}
// pithy print
encoder();
pithy_print->elapse();
show_encode_log_message();
return ret;
}
@ -324,13 +323,15 @@ int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDir
return ret;
}
void SrsEncoder::encoder()
void SrsEncoder::show_encode_log_message()
{
pprint->elapse();
// reportable
if (pithy_print->can_print()) {
if (pprint->can_print()) {
// TODO: FIXME: show more info.
srs_trace("-> "SRS_CONSTS_LOG_ENCODER" time=%"PRId64", encoders=%d, input=%s",
pithy_print->age(), (int)ffmpegs.size(), input_stream_name.c_str());
pprint->age(), (int)ffmpegs.size(), input_stream_name.c_str());
}
}

View file

@ -52,7 +52,7 @@ private:
std::vector<SrsFFMPEG*> ffmpegs;
private:
SrsThread* pthread;
SrsPithyPrint* pithy_print;
SrsPithyPrint* pprint;
public:
SrsEncoder();
virtual ~SrsEncoder();
@ -69,7 +69,7 @@ private:
virtual int parse_scope_engines(SrsRequest* req);
virtual int parse_ffmpeg(SrsRequest* req, SrsConfDirective* conf);
virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDirective* engine);
virtual void encoder();
virtual void show_encode_log_message();
};
#endif

View file

@ -43,6 +43,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_rtmp_amf0.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_core_autofree.hpp>
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@ -386,7 +387,8 @@ int SrsForwarder::forward()
client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_FORWARDER);
SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
@ -406,7 +408,7 @@ int SrsForwarder::forward()
}
while (pthread->can_loop()) {
pithy_print.elapse();
pprint->elapse();
// read from client.
if (true) {
@ -431,11 +433,11 @@ int SrsForwarder::forward()
}
// pithy print
if (pithy_print.can_print()) {
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> "SRS_CONSTS_LOG_FOWARDER
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(), count,
pprint->age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
}

View file

@ -825,7 +825,7 @@ SrsHls::SrsHls(SrsSource* s, ISrsHlsHandler* h)
muxer = new SrsHlsMuxer(h);
hls_cache = new SrsHlsCache();
pithy_print = new SrsPithyPrint(SRS_CONSTS_STAGE_HLS);
pprint = SrsPithyPrint::create_hls();
stream_dts = 0;
}
@ -838,7 +838,7 @@ SrsHls::~SrsHls()
srs_freep(muxer);
srs_freep(hls_cache);
srs_freep(pithy_print);
srs_freep(pprint);
}
int SrsHls::on_publish(SrsRequest* req)
@ -1012,24 +1012,25 @@ int SrsHls::on_video(SrsSharedPtrMessage* __video)
return ret;
}
hls_mux();
// pithy print message.
hls_show_mux_log();
return ret;
}
void SrsHls::hls_mux()
void SrsHls::hls_show_mux_log()
{
pprint->elapse();
// reportable
if (pithy_print->can_print()) {
if (pprint->can_print()) {
// the run time is not equals to stream time,
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/81#issuecomment-48100994
// it's ok.
srs_trace("-> "SRS_CONSTS_LOG_HLS
" time=%"PRId64", stream dts=%"PRId64"(%"PRId64"ms), sequence_no=%d",
pithy_print->age(), stream_dts, stream_dts / 90, muxer->sequence_no());
pprint->age(), stream_dts, stream_dts / 90, muxer->sequence_no());
}
pithy_print->elapse();
}
#endif

View file

@ -309,7 +309,7 @@ private:
SrsAvcAacCodec* codec;
SrsCodecSample* sample;
SrsRtmpJitter* jitter;
SrsPithyPrint* pithy_print;
SrsPithyPrint* pprint;
/**
* we store the stream dts,
* for when we notice the hls cache to publish,
@ -353,7 +353,7 @@ public:
*/
virtual int on_video(SrsSharedPtrMessage* __video);
private:
virtual void hls_mux();
virtual void hls_show_mux_log();
};
#endif

View file

@ -47,6 +47,7 @@ using namespace std;
#include <srs_kernel_aac.hpp>
#include <srs_kernel_mp3.hpp>
#include <srs_kernel_ts.hpp>
#include <srs_app_pithy_print.hpp>
SrsVodStream::SrsVodStream(string root_dir)
: SrsGoHttpFileServer(root_dir)
@ -249,8 +250,10 @@ int SrsStreamCache::cycle()
}
SrsAutoFree(SrsConsumer, consumer);
SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream_cache();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
// TODO: FIMXE: add pithy print.
// TODO: FIXME: support reload.
double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
@ -259,6 +262,8 @@ int SrsStreamCache::cycle()
}
while (true) {
pprint->elapse();
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
@ -275,8 +280,11 @@ int SrsStreamCache::cycle()
// ignore when nothing got.
continue;
}
srs_info("http: got %d msgs, min=%d, mw=%d", count,
SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
if (pprint->can_print()) {
srs_trace("-> "SRS_CONSTS_LOG_HTTP_STREAM_CACHE" http: got %d msgs, age=%d, min=%d, mw=%d",
pprint->age(), count, SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
}
// free the messages.
for (int i = 0; i < count; i++) {
@ -592,8 +600,10 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("http: consumer created success.");
SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
// TODO: FIMXE: add pithy print.
// the memory writer.
SrsStreamWriter writer(w);
@ -611,6 +621,8 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
}
while (true) {
pprint->elapse();
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
@ -627,8 +639,11 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
// ignore when nothing got.
continue;
}
srs_info("http: got %d msgs, min=%d, mw=%d", count,
SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
if (pprint->can_print()) {
srs_info("-> "SRS_CONSTS_LOG_HTTP_STREAM" http: got %d msgs, age=%d, min=%d, mw=%d",
pprint->age(), count, SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
}
// sendout all messages.
ret = streaming_send_messages(enc, msgs.msgs, count);

View file

@ -55,7 +55,7 @@ SrsIngester::SrsIngester()
_srs_config->subscribe(this);
pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true);
pithy_print = new SrsPithyPrint(SRS_CONSTS_STAGE_INGESTER);
pprint = SrsPithyPrint::create_ingester();
}
SrsIngester::~SrsIngester()
@ -186,8 +186,7 @@ int SrsIngester::cycle()
}
// pithy print
ingester();
pithy_print->elapse();
show_ingest_log_message();
return ret;
}
@ -340,17 +339,19 @@ int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, S
return ret;
}
void SrsIngester::ingester()
void SrsIngester::show_ingest_log_message()
{
pprint->elapse();
if ((int)ingesters.size() <= 0) {
return;
}
// reportable
if (pithy_print->can_print()) {
if (pprint->can_print()) {
// TODO: FIXME: show more info.
srs_trace("-> "SRS_CONSTS_LOG_INGESTER
" time=%"PRId64", ingesters=%d", pithy_print->age(), (int)ingesters.size());
" time=%"PRId64", ingesters=%d", pprint->age(), (int)ingesters.size());
}
}

View file

@ -65,7 +65,7 @@ private:
std::vector<SrsIngesterFFMPEG*> ingesters;
private:
SrsThread* pthread;
SrsPithyPrint* pithy_print;
SrsPithyPrint* pprint;
public:
SrsIngester();
virtual ~SrsIngester();
@ -82,7 +82,7 @@ private:
virtual int parse_ingesters(SrsConfDirective* vhost);
virtual int parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest);
virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine);
virtual void ingester();
virtual void show_ingest_log_message();
// interface ISrsReloadHandler.
public:
virtual int on_reload_vhost_removed(std::string vhost);

View file

@ -48,6 +48,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_rtmp_amf0.hpp>
#include <srs_raw_avc.hpp>
#include <srs_app_pithy_print.hpp>
SrsMpegtsQueue::SrsMpegtsQueue()
{
@ -140,6 +141,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
h264_pps_changed = false;
h264_sps_pps_sent = false;
queue = new SrsMpegtsQueue();
pprint = SrsPithyPrint::create_caster();
}
SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
@ -152,6 +154,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
srs_freep(avc);
srs_freep(aac);
srs_freep(queue);
srs_freep(pprint);
}
int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
@ -246,6 +249,8 @@ int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg)
{
int ret = ERROR_SUCCESS;
pprint->elapse();
// about the bytes of msg, specified by elementary stream which indicates by PES_packet_data_byte and stream_id
// for example, when SrsTsStream of SrsTsChannel indicates stream_type is SrsTsStreamVideoMpeg4 and SrsTsStreamAudioMpeg4,
// the elementary stream can be mux in "2.11 Carriage of ISO/IEC 14496 data" in hls-mpeg-ts-iso13818-1.pdf, page 103
@ -283,11 +288,12 @@ int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg)
// 14496-2 video stream number xxxx
// ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo
// TODO: FIXME: support pithy print.
srs_info("mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)",
(msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", srs_ts_stream2string(msg->channel->stream).c_str(),
msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid,
msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number());
if (pprint->can_print()) {
srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s age=%d stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)",
(msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", pprint->age(), srs_ts_stream2string(msg->channel->stream).c_str(),
msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid,
msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number());
}
// when not audio/video, or not adts/annexb format, donot support.
if (msg->stream_number() != 0) {
@ -557,9 +563,10 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
break;
}
// TODO: FIXME: use pithy print.
srs_info("mpegts: send msg %s dts=%"PRId64", size=%d",
msg->is_audio()? "A":msg->is_video()? "V":"N", msg->timestamp, msg->size);
if (pprint->can_print()) {
srs_trace("mpegts: send msg %s age=%d, dts=%"PRId64", size=%d",
msg->is_audio()? "A":msg->is_video()? "V":"N", pprint->age(), msg->timestamp, msg->size);
}
// send out encoded msg.
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {

View file

@ -47,6 +47,7 @@ class SrsRawH264Stream;
class SrsSharedPtrMessage;
class SrsRawAacStream;
class SrsRawAacStreamCodec;
class SrsPithyPrint;
#include <srs_app_st.hpp>
#include <srs_kernel_ts.hpp>
@ -101,6 +102,7 @@ private:
std::string aac_specific_config;
private:
SrsMpegtsQueue* queue;
SrsPithyPrint* pprint;
public:
SrsMpegtsOverUdp(SrsConfDirective* c);
virtual ~SrsMpegtsOverUdp();

View file

@ -31,8 +31,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#define SRS_CONSTS_STAGE_DEFAULT_INTERVAL_MS 1200
SrsStageInfo::SrsStageInfo(int _stage_id)
{
stage_id = _stage_id;
@ -51,40 +49,7 @@ SrsStageInfo::~SrsStageInfo()
void SrsStageInfo::update_print_time()
{
switch (stage_id) {
case SRS_CONSTS_STAGE_PLAY_USER: {
pithy_print_time_ms = _srs_config->get_pithy_print_play();
break;
}
case SRS_CONSTS_STAGE_PUBLISH_USER: {
pithy_print_time_ms = _srs_config->get_pithy_print_publish();
break;
}
case SRS_CONSTS_STAGE_FORWARDER: {
pithy_print_time_ms = _srs_config->get_pithy_print_forwarder();
break;
}
case SRS_CONSTS_STAGE_ENCODER: {
pithy_print_time_ms = _srs_config->get_pithy_print_encoder();
break;
}
case SRS_CONSTS_STAGE_INGESTER: {
pithy_print_time_ms = _srs_config->get_pithy_print_ingester();
break;
}
case SRS_CONSTS_STAGE_EDGE: {
pithy_print_time_ms = _srs_config->get_pithy_print_edge();
break;
}
case SRS_CONSTS_STAGE_HLS: {
pithy_print_time_ms = _srs_config->get_pithy_print_hls();
break;
}
default: {
pithy_print_time_ms = SRS_CONSTS_STAGE_DEFAULT_INTERVAL_MS;
break;
}
}
pithy_print_time_ms = _srs_config->get_pithy_print_ms();
}
void SrsStageInfo::elapse(int64_t diff)
@ -120,6 +85,80 @@ SrsPithyPrint::SrsPithyPrint(int _stage_id)
_age = 0;
}
///////////////////////////////////////////////////////////
// pithy-print consts values
///////////////////////////////////////////////////////////
// the pithy stage for all play clients.
#define SRS_CONSTS_STAGE_PLAY_USER 1
// the pithy stage for all publish clients.
#define SRS_CONSTS_STAGE_PUBLISH_USER 2
// the pithy stage for all forward clients.
#define SRS_CONSTS_STAGE_FORWARDER 3
// the pithy stage for all encoders.
#define SRS_CONSTS_STAGE_ENCODER 4
// the pithy stage for all hls.
#define SRS_CONSTS_STAGE_HLS 5
// the pithy stage for all ingesters.
#define SRS_CONSTS_STAGE_INGESTER 6
// the pithy stage for all edge.
#define SRS_CONSTS_STAGE_EDGE 7
// the pithy stage for all stream caster.
#define SRS_CONSTS_STAGE_CASTER 8
// the pithy stage for all http stream.
#define SRS_CONSTS_STAGE_HTTP_STREAM 9
// the pithy stage for all http stream cache.
#define SRS_CONSTS_STAGE_HTTP_STREAM_CACHE 10
SrsPithyPrint* SrsPithyPrint::create_rtmp_play()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_PLAY_USER);
}
SrsPithyPrint* SrsPithyPrint::create_rtmp_publish()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_PUBLISH_USER);
}
SrsPithyPrint* SrsPithyPrint::create_hls()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_HLS);
}
SrsPithyPrint* SrsPithyPrint::create_forwarder()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_FORWARDER);
}
SrsPithyPrint* SrsPithyPrint::create_encoder()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_ENCODER);
}
SrsPithyPrint* SrsPithyPrint::create_ingester()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_INGESTER);
}
SrsPithyPrint* SrsPithyPrint::create_edge()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_EDGE);
}
SrsPithyPrint* SrsPithyPrint::create_caster()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_CASTER);
}
SrsPithyPrint* SrsPithyPrint::create_http_stream()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_HTTP_STREAM);
}
SrsPithyPrint* SrsPithyPrint::create_http_stream_cache()
{
return new SrsPithyPrint(SRS_CONSTS_STAGE_HTTP_STREAM_CACHE);
}
SrsPithyPrint::~SrsPithyPrint()
{
leave_stage();

View file

@ -32,6 +32,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_reload.hpp>
/**
* the stage info to calc the age.
*/
class SrsStageInfo : public ISrsReloadHandler
{
public:
@ -56,6 +59,17 @@ public:
* the print time in a stage is constant and not changed.
* for example, stage #1 for all play clients, print time is 3s,
* if there is 10clients, then all clients should print in 10*3s.
* Usage:
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
while (true) {
pprint->elapse();
if (pprint->can_print()) {
// print pithy message.
// user can get the elapse time by: pprint->age()
}
// read and write RTMP messages.
}
*/
class SrsPithyPrint
{
@ -65,11 +79,19 @@ private:
// in ms.
int64_t _age;
int64_t previous_tick;
public:
/**
* @param _stage_id defined in SRS_CONSTS_STAGE_xxx, eg. SRS_CONSTS_STAGE_PLAY_USER.
*/
private:
SrsPithyPrint(int _stage_id);
public:
static SrsPithyPrint* create_rtmp_play();
static SrsPithyPrint* create_rtmp_publish();
static SrsPithyPrint* create_hls();
static SrsPithyPrint* create_forwarder();
static SrsPithyPrint* create_encoder();
static SrsPithyPrint* create_ingester();
static SrsPithyPrint* create_edge();
static SrsPithyPrint* create_caster();
static SrsPithyPrint* create_http_stream();
static SrsPithyPrint* create_http_stream_cache();
virtual ~SrsPithyPrint();
private:
/**

View file

@ -608,7 +608,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
srs_verbose("check play_refer success.");
// initialize other components
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
@ -621,6 +623,9 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
while (true) {
// collect elapse for pithy print.
pprint->elapse();
// to use isolate thread to recv, can improve about 33% performance.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/217
@ -644,9 +649,6 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
return ret;
}
// collect elapse for pithy print.
pithy_print.elapse();
#ifdef SRS_PERF_QUEUE_COND_WAIT
// for send wait time debug
srs_verbose("send thread now=%"PRId64"us, wait %dms", srs_update_system_time_ms(), mw_sleep);
@ -675,11 +677,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
}
// reportable
if (pithy_print.can_print()) {
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> "SRS_CONSTS_LOG_PLAY
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
pithy_print.age(), count,
pprint->age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
mw_sleep
@ -828,7 +830,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
}
srs_verbose("check publish_refer success.");
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER);
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
@ -850,6 +853,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
int64_t nb_msgs = 0;
while (true) {
pprint->elapse();
// cond wait for error.
trd->wait(SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000);
@ -870,15 +875,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
}
nb_msgs = trd->nb_msgs();
pithy_print.elapse();
// reportable
if (pithy_print.can_print()) {
if (pprint->can_print()) {
kbps->sample();
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pithy_print.age(),
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pprint->age(),
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
mr, mr_sleep

View file

@ -41,6 +41,7 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_raw_avc.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_app_pithy_print.hpp>
#ifdef SRS_AUTO_STREAM_CASTER
@ -51,12 +52,14 @@ SrsRtpConn::SrsRtpConn(SrsRtspConn* r, int p, int sid)
stream_id = sid;
listener = new SrsUdpListener(this, p);
cache = new SrsRtpPacket();
pprint = SrsPithyPrint::create_caster();
}
SrsRtpConn::~SrsRtpConn()
{
srs_freep(listener);
srs_freep(cache);
srs_freep(pprint);
}
int SrsRtpConn::port()
@ -73,6 +76,8 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
{
int ret = ERROR_SUCCESS;
pprint->elapse();
if (true) {
SrsStream stream;
@ -92,9 +97,9 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
}
cache->copy(&pkt);
cache->payload->append(pkt.payload->bytes(), pkt.payload->length());
if (!cache->completed) {
srs_trace("rtsp: rtp chunked %dB, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB",
nb_buf, cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc,
if (!cache->completed && pprint->can_print()) {
srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp chunked %dB, age=%d, vt=%d/%u, sts=%u/%#x/%#x, paylod=%dB",
nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc,
cache->payload->length()
);
return ret;
@ -106,10 +111,12 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
}
}
srs_trace("rtsp: rtp #%d %dB, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d",
stream_id, nb_buf, cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc,
cache->payload->length(), cache->chunked
);
if (pprint->can_print()) {
srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" rtsp: rtp #%d %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB, chunked=%d",
stream_id, nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc,
cache->payload->length(), cache->chunked
);
}
// always free it.
SrsAutoFree(SrsRtpPacket, cache);

View file

@ -55,6 +55,7 @@ class SrsRawAacStreamCodec;
class SrsSharedPtrMessage;
class SrsCodecSample;
class SrsSimpleBuffer;
class SrsPithyPrint;
/**
* a rtp connection which transport a stream.
@ -62,6 +63,7 @@ class SrsSimpleBuffer;
class SrsRtpConn: public ISrsUdpHandler
{
private:
SrsPithyPrint* pprint;
SrsUdpListener* listener;
SrsRtspConn* rtsp;
SrsRtpPacket* cache;
@ -117,7 +119,6 @@ private:
std::string output_template;
std::string rtsp_tcUrl;
std::string rtsp_stream;
private:
std::string session;
// video stream.

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 120
#define VERSION_REVISION 121
// server info.
#define RTMP_SIG_SRS_KEY "SRS"

View file

@ -162,31 +162,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONSTS_LOG_HLS "HLS"
// encoder log id.
#define SRS_CONSTS_LOG_ENCODER "ENC"
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
// pithy-print consts values
///////////////////////////////////////////////////////////
// the pithy stage for all play clients.
#define SRS_CONSTS_STAGE_PLAY_USER 1
// the pithy stage for all publish clients.
#define SRS_CONSTS_STAGE_PUBLISH_USER 2
// the pithy stage for all forward clients.
#define SRS_CONSTS_STAGE_FORWARDER 3
// the pithy stage for all encoders.
#define SRS_CONSTS_STAGE_ENCODER 4
// the pithy stage for all hls.
#define SRS_CONSTS_STAGE_HLS 5
// the pithy stage for all ingesters.
#define SRS_CONSTS_STAGE_INGESTER 6
// the pithy stage for all edge.
#define SRS_CONSTS_STAGE_EDGE 7
// http stream log id.
#define SRS_CONSTS_LOG_HTTP_STREAM "HTS"
// http stream cache log id.
#define SRS_CONSTS_LOG_HTTP_STREAM_CACHE "HTC"
// stream caster log id.
#define SRS_CONSTS_LOG_STREAM_CASTER "SCS"
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////

View file

@ -233,7 +233,6 @@ int SrsRtpPacket::decode_97(SrsStream* stream)
return ret;
}
int nb_samples = au_size / 2;
int required_size = 0;
// append left bytes to payload.