mirror of
https://github.com/ossrs/srs.git
synced 2025-02-14 12:21:55 +00:00
use send_min_interval for stream control. 2.0.183
This commit is contained in:
parent
c02da33c11
commit
ae37fa8f3e
14 changed files with 133 additions and 26 deletions
|
@ -342,6 +342,7 @@ Remark:
|
|||
|
||||
## History
|
||||
|
||||
* v2.0, 2015-08-14, use send_min_interval for stream control. 2.0.183
|
||||
* v2.0, 2015-08-12, enable the SRS_PERF_TCP_NODELAY and add config tcp_nodelay. 2.0.182
|
||||
* v2.0, 2015-08-11, for [#442](https://github.com/simple-rtmp-server/srs/issues/442) support kickoff connected client. 2.0.181
|
||||
* v2.0, 2015-07-21, for [#169](https://github.com/simple-rtmp-server/srs/issues/169) support default values for transcode. 2.0.180
|
||||
|
|
|
@ -858,6 +858,27 @@ vhost min.delay.com {
|
|||
tcp_nodelay on;
|
||||
}
|
||||
|
||||
# the vhost to control the stream delivery feature
|
||||
vhost stream.control.com {
|
||||
# @see vhost mrw.srs.com for detail.
|
||||
min_latency on;
|
||||
mr {
|
||||
enabled off;
|
||||
}
|
||||
mw_latency 100;
|
||||
# @see vhost min.delay.com
|
||||
queue_length 10;
|
||||
tcp_nodelay on;
|
||||
# the minimal packets send interval in ms,
|
||||
# used to control the ndiff of stream by srs_rtmp_dump,
|
||||
# for example, some device can only accept some stream which
|
||||
# delivery packets in constant interval(not cbr).
|
||||
# @remark 0 to disable the minimal interval.
|
||||
# @remark >0 to make the srs to send message one by one.
|
||||
# default: 0
|
||||
send_min_interval 3;
|
||||
}
|
||||
|
||||
# the vhost for antisuck.
|
||||
vhost refer.anti_suck.com {
|
||||
# the common refer for play and publish.
|
||||
|
|
|
@ -257,6 +257,7 @@ int main(int argc, char** argv)
|
|||
}
|
||||
|
||||
u_int32_t pre_timestamp = 0;
|
||||
int64_t pre_now = srs_utils_time_ms();
|
||||
for (;;) {
|
||||
int size;
|
||||
char type;
|
||||
|
@ -268,11 +269,12 @@ int main(int argc, char** argv)
|
|||
goto rtmp_destroy;
|
||||
}
|
||||
|
||||
if (srs_human_print_rtmp_packet2(type, timestamp, data, size, pre_timestamp) != 0) {
|
||||
if (srs_human_print_rtmp_packet3(type, timestamp, data, size, pre_timestamp, pre_now) != 0) {
|
||||
srs_human_trace("print rtmp packet failed.");
|
||||
goto rtmp_destroy;
|
||||
}
|
||||
pre_timestamp = timestamp;
|
||||
pre_now = srs_utils_time_ms();
|
||||
|
||||
// we only write some types of messages to flv file.
|
||||
int is_flv_msg = type == SRS_RTMP_TYPE_AUDIO
|
||||
|
|
|
@ -757,6 +757,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
|
|||
}
|
||||
srs_trace("vhost %s reload mw success.", vhost.c_str());
|
||||
}
|
||||
// smi(send_min_interval), only one per vhost
|
||||
if (!srs_directive_equals(new_vhost->get("send_min_interval"), old_vhost->get("send_min_interval"))) {
|
||||
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
|
||||
ISrsReloadHandler* subscribe = *it;
|
||||
if ((ret = subscribe->on_reload_vhost_smi(vhost)) != ERROR_SUCCESS) {
|
||||
srs_error("vhost %s notify subscribes smi failed. ret=%d", vhost.c_str(), ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
srs_trace("vhost %s reload smi success.", vhost.c_str());
|
||||
}
|
||||
// min_latency, only one per vhost
|
||||
if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) {
|
||||
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
|
||||
|
@ -1750,7 +1761,7 @@ int SrsConfig::check_config()
|
|||
&& n != "time_jitter" && n != "mix_correct"
|
||||
&& n != "atc" && n != "atc_auto"
|
||||
&& n != "debug_srs_upnode"
|
||||
&& n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay"
|
||||
&& n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay" && n != "send_min_interval"
|
||||
&& n != "security" && n != "http_remux"
|
||||
&& n != "http" && n != "http_static"
|
||||
&& n != "hds"
|
||||
|
@ -2506,6 +2517,23 @@ bool SrsConfig::get_tcp_nodelay(string vhost)
|
|||
return SRS_CONF_PERFER_FALSE(conf->arg0());
|
||||
}
|
||||
|
||||
int SrsConfig::get_send_min_interval(string vhost)
|
||||
{
|
||||
static int DEFAULT = 0;
|
||||
|
||||
SrsConfDirective* conf = get_vhost(vhost);
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
conf = conf->get("send_min_interval");
|
||||
if (!conf || conf->arg0().empty()) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
return ::atoi(conf->arg0().c_str());
|
||||
}
|
||||
|
||||
int SrsConfig::get_global_chunk_size()
|
||||
{
|
||||
SrsConfDirective* conf = root->get("chunk_size");
|
||||
|
|
|
@ -526,6 +526,10 @@ public:
|
|||
* whether enable tcp nodelay for all clients of vhost.
|
||||
*/
|
||||
virtual bool get_tcp_nodelay(std::string vhost);
|
||||
/**
|
||||
* the minimal send interval in ms.
|
||||
*/
|
||||
virtual int get_send_min_interval(std::string vhost);
|
||||
private:
|
||||
/**
|
||||
* get the global chunk size.
|
||||
|
|
|
@ -170,6 +170,11 @@ int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/)
|
|||
return ERROR_SUCCESS;
|
||||
}
|
||||
|
||||
int ISrsReloadHandler::on_reload_vhost_smi(string /*vhost*/)
|
||||
{
|
||||
return ERROR_SUCCESS;
|
||||
}
|
||||
|
||||
int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/)
|
||||
{
|
||||
return ERROR_SUCCESS;
|
||||
|
|
|
@ -73,6 +73,7 @@ public:
|
|||
virtual int on_reload_vhost_dvr(std::string vhost);
|
||||
virtual int on_reload_vhost_mr(std::string vhost);
|
||||
virtual int on_reload_vhost_mw(std::string vhost);
|
||||
virtual int on_reload_vhost_smi(std::string vhost);
|
||||
virtual int on_reload_vhost_realtime(std::string vhost);
|
||||
virtual int on_reload_vhost_chunk_size(std::string vhost);
|
||||
virtual int on_reload_vhost_transcode(std::string vhost);
|
||||
|
|
|
@ -93,6 +93,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
|
|||
mw_sleep = SRS_PERF_MW_SLEEP;
|
||||
mw_enabled = false;
|
||||
realtime = SRS_PERF_MIN_LATENCY_ENABLED;
|
||||
send_min_interval = 0;
|
||||
|
||||
_srs_config->subscribe(this);
|
||||
}
|
||||
|
@ -247,6 +248,23 @@ int SrsRtmpConn::on_reload_vhost_mw(string vhost)
|
|||
return ret;
|
||||
}
|
||||
|
||||
int SrsRtmpConn::on_reload_vhost_smi(string vhost)
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
int smi = _srs_config->get_send_min_interval(vhost);
|
||||
if (smi != send_min_interval) {
|
||||
srs_trace("apply smi %d=>%d", send_min_interval, smi);
|
||||
send_min_interval = smi;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
@ -589,10 +607,15 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
|
|||
// when mw_sleep changed, resize the socket send buffer.
|
||||
mw_enabled = true;
|
||||
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
|
||||
// initialize the send_min_interval
|
||||
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
|
||||
|
||||
// set the sock options.
|
||||
set_sock_options();
|
||||
|
||||
srs_trace("start play smi=%d, mw_sleep=%d, mw_enabled=%d, realtime=%d",
|
||||
send_min_interval, mw_sleep, mw_enabled, realtime);
|
||||
|
||||
while (!disposed) {
|
||||
// collect elapse for pithy print.
|
||||
pprint->elapse();
|
||||
|
@ -641,7 +664,8 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
|
|||
|
||||
// get messages from consumer.
|
||||
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
|
||||
int count = 0;
|
||||
// @remark when enable send_min_interval, only fetch one message a time.
|
||||
int count = send_min_interval? 1 : 0;
|
||||
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
|
||||
srs_error("get messages from consumer failed. ret=%d", ret);
|
||||
return ret;
|
||||
|
@ -716,6 +740,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
|
|||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// apply the minimal interval for delivery stream in ms.
|
||||
if (send_min_interval > 0) {
|
||||
st_usleep(send_min_interval * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
|
|
@ -85,6 +85,8 @@ private:
|
|||
// for realtime
|
||||
// @see https://github.com/simple-rtmp-server/srs/issues/257
|
||||
bool realtime;
|
||||
// the minimal interval in ms for delivery stream.
|
||||
int send_min_interval;
|
||||
public:
|
||||
SrsRtmpConn(SrsServer* svr, st_netfd_t c);
|
||||
virtual ~SrsRtmpConn();
|
||||
|
@ -96,6 +98,7 @@ protected:
|
|||
public:
|
||||
virtual int on_reload_vhost_removed(std::string vhost);
|
||||
virtual int on_reload_vhost_mw(std::string vhost);
|
||||
virtual int on_reload_vhost_smi(std::string vhost);
|
||||
virtual int on_reload_vhost_realtime(std::string vhost);
|
||||
// interface IKbpsDelta
|
||||
public:
|
||||
|
|
|
@ -301,7 +301,8 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in
|
|||
}
|
||||
|
||||
srs_assert(max_count > 0);
|
||||
count = srs_min(max_count, nb_msgs);
|
||||
// when count is 0, dumps all; otherwise, dumps no more than count.
|
||||
count = srs_min(max_count, count? count : nb_msgs);
|
||||
|
||||
SrsSharedPtrMessage** omsgs = msgs.data();
|
||||
for (int i = 0; i < count; i++) {
|
||||
|
|
|
@ -173,11 +173,12 @@ public:
|
|||
*/
|
||||
virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
|
||||
/**
|
||||
* get packets in consumer queue.
|
||||
* @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
|
||||
* @count the count in array, output param.
|
||||
* @max_count the max count to dequeue, must be positive.
|
||||
*/
|
||||
* get packets in consumer queue.
|
||||
* @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
|
||||
* @count the count in array, input and output param.
|
||||
* @max_count the max count to dequeue, must be positive.
|
||||
* @remark user can specifies the count to get specified msgs; 0 to get all if possible.
|
||||
*/
|
||||
virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
|
||||
/**
|
||||
* dumps packets to consumer, use specified args.
|
||||
|
@ -256,10 +257,11 @@ public:
|
|||
*/
|
||||
virtual int enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag);
|
||||
/**
|
||||
* get packets in consumer queue.
|
||||
* @param msgs the msgs array to dump packets to send.
|
||||
* @param count the count in array, output param.
|
||||
*/
|
||||
* get packets in consumer queue.
|
||||
* @param msgs the msgs array to dump packets to send.
|
||||
* @param count the count in array, intput and output param.
|
||||
* @remark user can specifies the count to get specified msgs; 0 to get all if possible.
|
||||
*/
|
||||
virtual int dump_packets(SrsMessageArray* msgs, int& count);
|
||||
#ifdef SRS_PERF_QUEUE_COND_WAIT
|
||||
/**
|
||||
|
|
|
@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
// current release version
|
||||
#define VERSION_MAJOR 2
|
||||
#define VERSION_MINOR 0
|
||||
#define VERSION_REVISION 182
|
||||
#define VERSION_REVISION 183
|
||||
|
||||
// server info.
|
||||
#define RTMP_SIG_SRS_KEY "SRS"
|
||||
|
|
|
@ -1915,8 +1915,7 @@ void srs_amf0_strict_array_append(srs_amf0_t amf0, srs_amf0_t value)
|
|||
|
||||
int64_t srs_utils_time_ms()
|
||||
{
|
||||
srs_update_system_time_ms();
|
||||
return srs_get_system_time_ms();
|
||||
return srs_update_system_time_ms();
|
||||
}
|
||||
|
||||
int64_t srs_utils_send_bytes(srs_rtmp_t rtmp)
|
||||
|
@ -2320,6 +2319,11 @@ int srs_human_print_rtmp_packet(char type, u_int32_t timestamp, char* data, int
|
|||
}
|
||||
|
||||
int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp)
|
||||
{
|
||||
return srs_human_print_rtmp_packet3(type, timestamp, data, size, pre_timestamp, 0);
|
||||
}
|
||||
|
||||
int srs_human_print_rtmp_packet3(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp, int64_t pre_now)
|
||||
{
|
||||
int ret = ERROR_SUCCESS;
|
||||
|
||||
|
@ -2328,24 +2332,29 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int
|
|||
diff = (int)timestamp - (int)pre_timestamp;
|
||||
}
|
||||
|
||||
int ndiff = 0;
|
||||
if (pre_now > 0) {
|
||||
ndiff = (int)(srs_utils_time_ms() - pre_now);
|
||||
}
|
||||
|
||||
u_int32_t pts;
|
||||
if (srs_utils_parse_timestamp(timestamp, type, data, size, &pts) != 0) {
|
||||
srs_human_trace("Rtmp packet type=%s, dts=%d, diff=%d, size=%d, DecodeError",
|
||||
srs_human_flv_tag_type2string(type), timestamp, diff, size
|
||||
srs_human_trace("Rtmp packet type=%s, dts=%d, diff=%d, ndiff=%d, size=%d, DecodeError",
|
||||
srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size
|
||||
);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (type == SRS_RTMP_TYPE_VIDEO) {
|
||||
srs_human_trace("Video packet type=%s, dts=%d, pts=%d, diff=%d, size=%d, %s(%s,%s)",
|
||||
srs_human_flv_tag_type2string(type), timestamp, pts, diff, size,
|
||||
srs_human_trace("Video packet type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s)",
|
||||
srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size,
|
||||
srs_human_flv_video_codec_id2string(srs_utils_flv_video_codec_id(data, size)),
|
||||
srs_human_flv_video_avc_packet_type2string(srs_utils_flv_video_avc_packet_type(data, size)),
|
||||
srs_human_flv_video_frame_type2string(srs_utils_flv_video_frame_type(data, size))
|
||||
);
|
||||
} else if (type == SRS_RTMP_TYPE_AUDIO) {
|
||||
srs_human_trace("Audio packet type=%s, dts=%d, pts=%d, diff=%d, size=%d, %s(%s,%s,%s,%s)",
|
||||
srs_human_flv_tag_type2string(type), timestamp, pts, diff, size,
|
||||
srs_human_trace("Audio packet type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s,%s,%s)",
|
||||
srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size,
|
||||
srs_human_flv_audio_sound_format2string(srs_utils_flv_audio_sound_format(data, size)),
|
||||
srs_human_flv_audio_sound_rate2string(srs_utils_flv_audio_sound_rate(data, size)),
|
||||
srs_human_flv_audio_sound_size2string(srs_utils_flv_audio_sound_size(data, size)),
|
||||
|
@ -2353,8 +2362,8 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int
|
|||
srs_human_flv_audio_aac_packet_type2string(srs_utils_flv_audio_aac_packet_type(data, size))
|
||||
);
|
||||
} else if (type == SRS_RTMP_TYPE_SCRIPT) {
|
||||
srs_human_verbose("Data packet type=%s, time=%d, diff=%d, size=%d",
|
||||
srs_human_flv_tag_type2string(type), timestamp, diff, size);
|
||||
srs_human_verbose("Data packet type=%s, time=%d, diff=%d, ndiff=%d, size=%d",
|
||||
srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size);
|
||||
int nparsed = 0;
|
||||
while (nparsed < size) {
|
||||
int nb_parsed_this = 0;
|
||||
|
@ -2370,8 +2379,8 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int
|
|||
srs_freep(amf0_str);
|
||||
}
|
||||
} else {
|
||||
srs_human_trace("Rtmp packet type=%#x, dts=%d, pts=%d, diff=%d, size=%d",
|
||||
type, timestamp, pts, diff, size);
|
||||
srs_human_trace("Rtmp packet type=%#x, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d",
|
||||
type, timestamp, pts, diff, ndiff, size);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
|
|
@ -904,6 +904,7 @@ extern const char* srs_human_flv_audio_aac_packet_type2string(char aac_packet_ty
|
|||
*/
|
||||
extern int srs_human_print_rtmp_packet(char type, u_int32_t timestamp, char* data, int size);
|
||||
extern int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp);
|
||||
extern int srs_human_print_rtmp_packet3(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp, int64_t pre_now);
|
||||
|
||||
// log to console, for use srs-librtmp application.
|
||||
extern const char* srs_human_format_time();
|
||||
|
|
Loading…
Reference in a new issue