1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-12 19:31:53 +00:00

performance refine, support 3k+ connections(270kbps). 0.9.130

This commit is contained in:
winlin 2014-06-22 20:01:25 +08:00
parent e9c96af91a
commit 1ae3e6c64c
18 changed files with 230 additions and 162 deletions

View file

@ -241,6 +241,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.<br/> * 2013-10-17, Created.<br/>
## History ## History
* v1.0, 2014-06-22, performance refine, support 3k+ connections(270kbps). 0.9.130
* v1.0, 2014-06-21, support edge [token traverse](https://github.com/winlinvip/simple-rtmp-server/wiki/DRM#tokentraverse), fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129 * v1.0, 2014-06-21, support edge [token traverse](https://github.com/winlinvip/simple-rtmp-server/wiki/DRM#tokentraverse), fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129
* v1.0, 2014-06-19, add connections count to api summaries. 0.9.127 * v1.0, 2014-06-19, add connections count to api summaries. 0.9.127
* v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126 * v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126

View file

@ -36,7 +36,7 @@ srs_log_level trace;
srs_log_file ./objs/srs.log; srs_log_file ./objs/srs.log;
# the max connections. # the max connections.
# if exceed the max connections, server will drop the new connection. # if exceed the max connections, server will drop the new connection.
# default: 2000 # default: 12345
max_connections 1000; max_connections 1000;
# whether start as deamon # whether start as deamon
# @remark: donot support reload. # @remark: donot support reload.

2
trunk/configure vendored
View file

@ -456,7 +456,7 @@ MODULE_ID="RTMP"
MODULE_DEPENDS=("CORE" "KERNEL") MODULE_DEPENDS=("CORE" "KERNEL")
ModuleLibIncs=(${SRS_OBJS} ${LibSSLRoot}) ModuleLibIncs=(${SRS_OBJS} ${LibSSLRoot})
MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_protocol_rtmp_stack" "srs_protocol_rtmp" MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_protocol_rtmp_stack" "srs_protocol_rtmp"
"srs_protocol_handshake" "srs_protocol_utility") "srs_protocol_handshake" "srs_protocol_utility" "srs_protocol_msg_array")
RTMP_INCS="src/rtmp"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh RTMP_INCS="src/rtmp"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh
RTMP_OBJS="${MODULE_OBJS[@]}" RTMP_OBJS="${MODULE_OBJS[@]}"
# #

View file

@ -1403,7 +1403,7 @@ int SrsConfig::get_max_connections()
SrsConfDirective* conf = root->get("max_connections"); SrsConfDirective* conf = root->get("max_connections");
if (!conf || conf->arg0().empty()) { if (!conf || conf->arg0().empty()) {
return 2000; return SRS_CONF_DEFAULT_MAX_CONNECTIONS;
} }
return ::atoi(conf->arg0().c_str()); return ::atoi(conf->arg0().c_str());

View file

@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_PID_FILE "./objs/srs.pid" #define SRS_CONF_DEFAULT_PID_FILE "./objs/srs.pid"
#define SRS_DEFAULT_CONF "conf/srs.conf" #define SRS_DEFAULT_CONF "conf/srs.conf"
#define SRS_CONF_DEFAULT_MAX_CONNECTIONS 12345
#define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html" #define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html"
#define SRS_CONF_DEFAULT_HLS_FRAGMENT 10 #define SRS_CONF_DEFAULT_HLS_FRAGMENT 10
#define SRS_CONF_DEFAULT_HLS_WINDOW 60 #define SRS_CONF_DEFAULT_HLS_WINDOW 60

View file

@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_socket.hpp> #include <srs_app_socket.hpp>
#include <srs_app_kbps.hpp> #include <srs_app_kbps.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>
// when error, edge ingester sleep for a while and retry. // when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
@ -431,6 +432,7 @@ void SrsEdgeForwarder::stop()
kbps->set_io(NULL, NULL); kbps->set_io(NULL, NULL);
} }
#define SYS_MAX_EDGE_SEND_MSGS 128
int SrsEdgeForwarder::cycle() int SrsEdgeForwarder::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -438,6 +440,8 @@ int SrsEdgeForwarder::cycle()
client->set_recv_timeout(SRS_PULSE_TIMEOUT_US); client->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_STAGE_EDGE); SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
while (pthread->can_loop()) { while (pthread->can_loop()) {
// switch to other st-threads. // switch to other st-threads.
@ -465,8 +469,7 @@ int SrsEdgeForwarder::cycle()
// forward all messages. // forward all messages.
int count = 0; int count = 0;
SrsSharedPtrMessage** msgs = NULL; if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward to origin failed. ret=%d", ret); srs_error("get message to forward to origin failed. ret=%d", ret);
return ret; return ret;
} }
@ -488,16 +491,15 @@ int SrsEdgeForwarder::cycle()
srs_verbose("no packets to forward."); srs_verbose("no packets to forward.");
continue; continue;
} }
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
// all msgs to forward to origin. // all msgs to forward to origin.
// @remark, becareful, all msgs must be free explicitly, // @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep. // free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_assert(msg); srs_assert(msg);
msgs[i] = NULL; msgs.msgs[i] = NULL;
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("edge publish forwarder send message to server failed. ret=%d", ret); srs_error("edge publish forwarder send message to server failed. ret=%d", ret);

View file

@ -41,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_protocol_rtmp.hpp> #include <srs_protocol_rtmp.hpp>
#include <srs_app_kbps.hpp> #include <srs_app_kbps.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>
// when error, forwarder sleep for a while and retry. // when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@ -309,6 +310,7 @@ int SrsForwarder::connect_server()
return ret; return ret;
} }
#define SYS_MAX_FORWARD_SEND_MSGS 128
int SrsForwarder::forward() int SrsForwarder::forward()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -317,6 +319,8 @@ int SrsForwarder::forward()
SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER); SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
while (pthread->can_loop()) { while (pthread->can_loop()) {
// switch to other st-threads. // switch to other st-threads.
st_usleep(0); st_usleep(0);
@ -339,8 +343,7 @@ int SrsForwarder::forward()
// forward all messages. // forward all messages.
int count = 0; int count = 0;
SrsSharedPtrMessage** msgs = NULL; if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward failed. ret=%d", ret); srs_error("get message to forward failed. ret=%d", ret);
return ret; return ret;
} }
@ -360,16 +363,15 @@ int SrsForwarder::forward()
srs_verbose("no packets to forward."); srs_verbose("no packets to forward.");
continue; continue;
} }
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
// all msgs to forward. // all msgs to forward.
// @remark, becareful, all msgs must be free explicitly, // @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep. // free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_assert(msg); srs_assert(msg);
msgs[i] = NULL; msgs.msgs[i] = NULL;
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("forwarder send message to server failed. ret=%d", ret); srs_error("forwarder send message to server failed. ret=%d", ret);

View file

@ -50,6 +50,7 @@ using namespace std;
#include <srs_app_utility.hpp> #include <srs_app_utility.hpp>
#include <srs_protocol_utility.hpp> #include <srs_protocol_utility.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>
// when stream is busy, for example, streaming is already // when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish, // publishing, when a new client to request to publish,
@ -382,7 +383,7 @@ int SrsRtmpConn::stream_service_cycle()
} }
srs_info("start to publish stream %s success", req->stream.c_str()); srs_info("start to publish stream %s success", req->stream.c_str());
ret = fmle_publish(source); ret = fmle_publishing(source);
// when edge, notice edge to change state. // when edge, notice edge to change state.
// when origin, notice all service to unpublish. // when origin, notice all service to unpublish.
@ -416,7 +417,7 @@ int SrsRtmpConn::stream_service_cycle()
} }
srs_info("flash start to publish stream %s success", req->stream.c_str()); srs_info("flash start to publish stream %s success", req->stream.c_str());
ret = flash_publish(source); ret = flash_publishing(source);
// when edge, notice edge to change state. // when edge, notice edge to change state.
// when origin, notice all service to unpublish. // when origin, notice all service to unpublish.
@ -476,6 +477,8 @@ int SrsRtmpConn::check_vhost()
return ret; return ret;
} }
#define SYS_MAX_PLAY_SEND_MSGS 128
int SrsRtmpConn::playing(SrsSource* source) int SrsRtmpConn::playing(SrsSource* source)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -499,38 +502,43 @@ int SrsRtmpConn::playing(SrsSource* source)
rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_US); rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER); SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER);
SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1; int64_t starttime = -1;
while (true) { while (true) {
// switch to other st-threads. // collect elapse for pithy print.
st_usleep(0);
pithy_print.elapse(); pithy_print.elapse();
// read from client. // read from client.
if (true) { if (true) {
SrsMessage* msg = NULL; SrsMessage* msg = NULL;
ret = rtmp->recv_message(&msg); ret = rtmp->recv_message(&msg);
srs_verbose("play loop recv message. ret=%d", ret); srs_verbose("play loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { if (ret == ERROR_SOCKET_TIMEOUT) {
// it's ok, do nothing.
ret = ERROR_SUCCESS;
} else if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("recv client control message failed. ret=%d", ret); srs_error("recv client control message failed. ret=%d", ret);
} }
return ret; return ret;
} } else {
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret)) { if (!srs_is_system_control_error(ret)) {
srs_error("process play control message failed. ret=%d", ret); srs_error("process play control message failed. ret=%d", ret);
}
return ret;
} }
return ret;
} }
} }
// get messages from consumer. // get messages from consumer.
SrsSharedPtrMessage** msgs = NULL;
int count = 0; int count = 0;
if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) { if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret); srs_error("get messages from consumer failed. ret=%d", ret);
return ret; return ret;
} }
@ -545,32 +553,29 @@ int SrsRtmpConn::playing(SrsSource* source)
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
} }
if (count <= 0) {
srs_verbose("no packets in queue.");
continue;
}
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
// sendout messages // sendout messages
// @remark, becareful, all msgs must be free explicitly, // @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep. // free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs.msgs[i];
// the send_message will free the msg, // the send_message will free the msg,
// so set the msgs[i] to NULL. // so set the msgs[i] to NULL.
msgs[i] = NULL; msgs.msgs[i] = NULL;
srs_assert(msg); // only when user specifies the duration,
// we start to collect the durations for each message.
// foreach msg, collect the duration. if (user_specified_duration_to_stop) {
// @remark: never use msg when sent it, for the protocol sdk will free it. // foreach msg, collect the duration.
if (starttime < 0 || starttime > msg->header.timestamp) { // @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->header.timestamp) {
starttime = msg->header.timestamp;
}
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp; starttime = msg->header.timestamp;
} }
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;
// no need to assert msg, for the rtmp will assert it.
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) { if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
srs_error("send message to client failed. ret=%d", ret); srs_error("send message to client failed. ret=%d", ret);
return ret; return ret;
@ -579,17 +584,22 @@ int SrsRtmpConn::playing(SrsSource* source)
// if duration specified, and exceed it, stop play live. // if duration specified, and exceed it, stop play live.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/45 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/45
if (req->duration > 0 && duration >= (int64_t)req->duration) { if (user_specified_duration_to_stop) {
ret = ERROR_RTMP_DURATION_EXCEED; if (duration >= (int64_t)req->duration) {
srs_trace("stop live for duration exceed. ret=%d", ret); ret = ERROR_RTMP_DURATION_EXCEED;
return ret; srs_trace("stop live for duration exceed. ret=%d", ret);
return ret;
}
} }
// switch to other threads, to anti dead loop.
st_usleep(0);
} }
return ret; return ret;
} }
int SrsRtmpConn::fmle_publish(SrsSource* source) int SrsRtmpConn::fmle_publishing(SrsSource* source)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -668,7 +678,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
return ret; return ret;
} }
int SrsRtmpConn::flash_publish(SrsSource* source) int SrsRtmpConn::flash_publishing(SrsSource* source)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;

View file

@ -48,6 +48,7 @@ class SrsHttpHooks;
class SrsBandwidth; class SrsBandwidth;
class SrsKbps; class SrsKbps;
class SrsRtmpClient; class SrsRtmpClient;
class SrsSharedPtrMessage;
/** /**
* the client provides the main logic control for RTMP clients. * the client provides the main logic control for RTMP clients.
@ -87,8 +88,8 @@ private:
virtual int stream_service_cycle(); virtual int stream_service_cycle();
virtual int check_vhost(); virtual int check_vhost();
virtual int playing(SrsSource* source); virtual int playing(SrsSource* source);
virtual int fmle_publish(SrsSource* source); virtual int fmle_publishing(SrsSource* source);
virtual int flash_publish(SrsSource* source); virtual int flash_publishing(SrsSource* source);
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
private: private:

View file

@ -136,7 +136,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (msg->header.is_video() || msg->header.is_audio()) { if (msg->header.is_audio() || msg->header.is_video()) {
if (av_start_time == -1) { if (av_start_time == -1) {
av_start_time = msg->header.timestamp; av_start_time = msg->header.timestamp;
} }
@ -153,7 +153,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
return ret; return ret;
} }
int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -161,17 +161,8 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in
return ret; return ret;
} }
if (max_count == 0) { srs_assert(max_count > 0);
count = (int)msgs.size(); count = srs_min(max_count, (int)msgs.size());
} else {
count = srs_min(max_count, (int)msgs.size());
}
if (count <= 0) {
return ret;
}
pmsgs = new SrsSharedPtrMessage*[count];
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
pmsgs[i] = msgs[i]; pmsgs[i] = msgs[i];
@ -275,11 +266,11 @@ int SrsConsumer::get_time()
return jitter->get_time(); return jitter->get_time();
} }
int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (!source->is_atc()) { if (!atc) {
if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) { if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
srs_freep(msg); srs_freep(msg);
return ret; return ret;
@ -293,8 +284,10 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
return ret; return ret;
} }
int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{ {
srs_assert(max_count > 0);
if (should_update_source_id) { if (should_update_source_id) {
srs_trace("update source_id=%d", source->source_id()); srs_trace("update source_id=%d", source->source_id());
should_update_source_id = false; should_update_source_id = false;
@ -305,7 +298,7 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
return ERROR_SUCCESS; return ERROR_SUCCESS;
} }
return queue->get_packets(max_count, pmsgs, count); return queue->dump_packets(max_count, pmsgs, count);
} }
int SrsConsumer::on_play_client_pause(bool is_pause) int SrsConsumer::on_play_client_pause(bool is_pause)
@ -391,14 +384,15 @@ void SrsGopCache::clear()
cached_video_count = 0; cached_video_count = 0;
} }
int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) int SrsGopCache::dump(SrsConsumer* consumer, bool atc, int tba, int tbv)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
std::vector<SrsSharedPtrMessage*>::iterator it; std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
SrsSharedPtrMessage* msg = *it; SrsSharedPtrMessage* msg = *it;
if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) { SrsSharedPtrMessage* copy = msg->copy();
if ((ret = consumer->enqueue(copy, atc, tba, tbv)) != ERROR_SUCCESS) {
srs_error("dispatch cached gop failed. ret=%d", ret); srs_error("dispatch cached gop failed. ret=%d", ret);
return ret; return ret;
} }
@ -926,7 +920,8 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
std::vector<SrsConsumer*>::iterator it; std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) { for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it; SrsConsumer* consumer = *it;
if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { SrsSharedPtrMessage* copy = cache_metadata->copy();
if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch the metadata failed. ret=%d", ret); srs_error("dispatch the metadata failed. ret=%d", ret);
return ret; return ret;
} }
@ -987,17 +982,17 @@ int SrsSource::on_audio(SrsMessage* audio)
// copy to all consumer // copy to all consumer
if (true) { if (true) {
std::vector<SrsConsumer*>::iterator it; for (int i = 0; i < (int)consumers.size(); i++) {
for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = consumers.at(i);
SrsConsumer* consumer = *it; SrsSharedPtrMessage* copy = msg->copy();
if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch the audio failed. ret=%d", ret); srs_error("dispatch the audio failed. ret=%d", ret);
return ret; return ret;
} }
} }
srs_info("dispatch audio success."); srs_info("dispatch audio success.");
} }
// copy to all forwarders. // copy to all forwarders.
if (true) { if (true) {
std::vector<SrsForwarder*>::iterator it; std::vector<SrsForwarder*>::iterator it;
@ -1077,10 +1072,10 @@ int SrsSource::on_video(SrsMessage* video)
// copy to all consumer // copy to all consumer
if (true) { if (true) {
std::vector<SrsConsumer*>::iterator it; for (int i = 0; i < (int)consumers.size(); i++) {
for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = consumers.at(i);
SrsConsumer* consumer = *it; SrsSharedPtrMessage* copy = msg->copy();
if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch the video failed. ret=%d", ret); srs_error("dispatch the video failed. ret=%d", ret);
return ret; return ret;
} }
@ -1327,27 +1322,27 @@ void SrsSource::on_unpublish()
} }
// copy metadata. // copy metadata.
if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch metadata failed. ret=%d", ret); srs_error("dispatch metadata failed. ret=%d", ret);
return ret; return ret;
} }
srs_info("dispatch metadata success"); srs_info("dispatch metadata success");
// copy sequence header // copy sequence header
if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret); srs_error("dispatch video sequence header failed. ret=%d", ret);
return ret; return ret;
} }
srs_info("dispatch video sequence header success"); srs_info("dispatch video sequence header success");
if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch audio sequence header failed. ret=%d", ret); srs_error("dispatch audio sequence header failed. ret=%d", ret);
return ret; return ret;
} }
srs_info("dispatch audio sequence header success"); srs_info("dispatch audio sequence header success");
// copy gop cache to client. // copy gop cache to client.
if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) { if ((ret = gop_cache->dump(consumer, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -1375,11 +1370,6 @@ void SrsSource::set_cache(bool enabled)
gop_cache->set(enabled); gop_cache->set(enabled);
} }
bool SrsSource::is_atc()
{
return atc;
}
int SrsSource::on_edge_start_play() int SrsSource::on_edge_start_play()
{ {
return play_edge->on_client_play(); return play_edge->on_client_play();

View file

@ -110,11 +110,11 @@ public:
virtual int enqueue(SrsSharedPtrMessage* msg); virtual int enqueue(SrsSharedPtrMessage* msg);
/** /**
* get packets in consumer queue. * get packets in consumer queue.
* @pmsgs SrsMessages*[], output the prt array. * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
* @count the count in array. * @count the count in array, output param.
* @max_count the max count to dequeue, 0 to dequeue all. * @max_count the max count to dequeue, must be positive.
*/ */
virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
private: private:
/** /**
* remove a gop from the front. * remove a gop from the front.
@ -155,19 +155,20 @@ public:
virtual int get_time(); virtual int get_time();
/** /**
* enqueue an shared ptr message. * enqueue an shared ptr message.
* @param whether atc, donot use jitter correct if true.
* @param tba timebase of audio. * @param tba timebase of audio.
* used to calc the audio time delta if time-jitter detected. * used to calc the audio time delta if time-jitter detected.
* @param tbv timebase of video. * @param tbv timebase of video.
* used to calc the video time delta if time-jitter detected. * used to calc the video time delta if time-jitter detected.
*/ */
virtual int enqueue(SrsSharedPtrMessage* msg, int tba, int tbv); virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv);
/** /**
* get packets in consumer queue. * get packets in consumer queue.
* @pmsgs SrsMessages*[], output the prt array. * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
* @count the count in array. * @count the count in array, output param.
* @max_count the max count to dequeue, 0 to dequeue all. * @max_count the max count to dequeue, must be positive.
*/ */
virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
/** /**
* when client send the pause message. * when client send the pause message.
*/ */
@ -208,7 +209,7 @@ public:
*/ */
virtual int cache(SrsSharedPtrMessage* msg); virtual int cache(SrsSharedPtrMessage* msg);
virtual void clear(); virtual void clear();
virtual int dump(SrsConsumer* consumer, int tba, int tbv); virtual int dump(SrsConsumer* consumer, bool atc, int tba, int tbv);
/** /**
* used for atc to get the time of gop cache, * used for atc to get the time of gop cache,
* the atc will adjust the sequence header timestamp to gop cache. * the atc will adjust the sequence header timestamp to gop cache.
@ -346,8 +347,6 @@ public:
virtual void set_cache(bool enabled); virtual void set_cache(bool enabled);
// internal // internal
public: public:
// for consumer, atc feature.
virtual bool is_atc();
// for edge, when play edge stream, check the state // for edge, when play edge stream, check the state
virtual int on_edge_start_play(); virtual int on_edge_start_play();
// for edge, when publish edge stream, check the state // for edge, when publish edge stream, check the state

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR "0" #define VERSION_MAJOR "0"
#define VERSION_MINOR "9" #define VERSION_MINOR "9"
#define VERSION_REVISION "129" #define VERSION_REVISION "130"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info. // server info.
#define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_KEY "SRS"

View file

@ -66,53 +66,4 @@ public:
} }
}; };
/**
* auto free the array ptrs, for example, MyClass* msgs[10],
* which stores 10 MyClass* objects, this class will:
* 1. free each MyClass* in array.
* 2. free the msgs itself.
* 3. set msgs to NULL.
* @remark, MyClass* msgs[] equals to MyClass**, the ptr array equals ptr to ptr.
* Usage:
* MyClass* msgs[10];
* // ...... use msgs.
* SrsAutoFreeArray(MyClass, msgs, 10);
*/
#define SrsAutoFreeArray(className, instance, size) \
__SrsAutoFreeArray<className> _auto_free_array_##instance(&instance, size)
template<class T>
class __SrsAutoFreeArray
{
private:
T*** ptr;
int size;
public:
/**
* auto delete the ptr array.
*/
__SrsAutoFreeArray(T*** _ptr, int _size) {
ptr = _ptr;
size = _size;
}
virtual ~__SrsAutoFreeArray() {
if (ptr == NULL || *ptr == NULL) {
return;
}
T** arr = *ptr;
for (int i = 0; i < size; i++) {
T* pobj = arr[i];
if (pobj) {
delete pobj;
arr[i] = NULL;
}
}
delete arr;
*ptr = NULL;
}
};
#endif #endif

View file

@ -0,0 +1,51 @@
/*
The MIT License (MIT)
Copyright (c) 2013-2014 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_protocol_msg_array.hpp>
#include <srs_protocol_rtmp_stack.hpp>
SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size)
{
srs_assert(_size > 0);
msgs = new SrsSharedPtrMessage*[_size];
size = _size;
// initialize
for (int i = 0; i < _size; i++) {
msgs[i] = NULL;
}
}
SrsSharedPtrMessageArray::~SrsSharedPtrMessageArray()
{
// cleanup
for (int i = 0; i < size; i++) {
SrsSharedPtrMessage* msg = msgs[i];
srs_freep(msg);
}
srs_freep(msgs);
}

View file

@ -0,0 +1,53 @@
/*
The MIT License (MIT)
Copyright (c) 2013-2014 winlin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_RTMP_PROTOCOL_MSG_ARRAY_HPP
#define SRS_RTMP_PROTOCOL_MSG_ARRAY_HPP
/*
#include <srs_protocol_msg_array.hpp>
*/
#include <srs_core.hpp>
class SrsSharedPtrMessage;
/**
* the class to auto free the shared ptr message array.
*/
class SrsSharedPtrMessageArray
{
public:
/**
* when user already send the msg in msgs, please set to NULL,
* for instance, msg= msgs.msgs[i], msgs.msgs[i]=NULL, send(msg),
* where send(msg) will always send and free it.
*/
SrsSharedPtrMessage** msgs;
int size;
public:
SrsSharedPtrMessageArray(int _size);
virtual ~SrsSharedPtrMessageArray();
};
#endif

View file

@ -437,13 +437,12 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
return ret; return ret;
} }
int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet) int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// always free msg. // always not NULL msg.
srs_assert(msg); srs_assert(msg);
SrsAutoFree(SrsMessage, msg);
// we donot use the complex basic header, // we donot use the complex basic header,
// ensure the basic header is 1bytes. // ensure the basic header is 1bytes.
@ -497,7 +496,7 @@ int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet)
*pheader++ = pp[3]; *pheader++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian // chunk extended timestamp header, 0 or 4 bytes, big-endian
if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ if(timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp; pp = (char*)&timestamp;
*pheader++ = pp[3]; *pheader++ = pp[3];
*pheader++ = pp[2]; *pheader++ = pp[2];
@ -522,7 +521,7 @@ int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet)
// @see: ngx_rtmp_prepare_message // @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699 // @see: http://blog.csdn.net/win_lin/article/details/13363699
u_int32_t timestamp = (u_int32_t)msg->header.timestamp; u_int32_t timestamp = (u_int32_t)msg->header.timestamp;
if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp; pp = (char*)&timestamp;
*pheader++ = pp[3]; *pheader++ = pp[3];
*pheader++ = pp[2]; *pheader++ = pp[2];
@ -733,7 +732,12 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
if (msg) { if (msg) {
msg->header.stream_id = stream_id; msg->header.stream_id = stream_id;
} }
return do_send_and_free_message(msg, NULL);
// donot use the auto free to free the msg,
// for performance issue.
int ret = do_send_message(msg, NULL);
srs_freep(msg);
return ret;
} }
int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
@ -767,9 +771,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
msg->header.stream_id = stream_id; msg->header.stream_id = stream_id;
msg->header.perfer_cid = packet->get_perfer_cid(); msg->header.perfer_cid = packet->get_perfer_cid();
if ((ret = do_send_and_free_message(msg, packet)) != ERROR_SUCCESS) { // donot use the auto free to free the msg,
return ret; // for performance issue.
} ret = do_send_message(msg, packet);
srs_freep(msg);
return ret; return ret;
} }

View file

@ -174,10 +174,10 @@ public:
virtual int send_and_free_packet(SrsPacket* packet, int stream_id); virtual int send_and_free_packet(SrsPacket* packet, int stream_id);
private: private:
/** /**
* imp for send_and_free_message * send out the message, donot free it, the caller must free the param msg.
* @param packet the packet of message, NULL for raw message. * @param packet the packet of message, NULL for raw message.
*/ */
virtual int do_send_and_free_message(SrsMessage* msg, SrsPacket* packet); virtual int do_send_message(SrsMessage* msg, SrsPacket* packet);
/** /**
* imp for decode_message * imp for decode_message
*/ */

View file

@ -36,6 +36,8 @@ file
..\rtmp\srs_protocol_handshake.cpp, ..\rtmp\srs_protocol_handshake.cpp,
..\rtmp\srs_protocol_io.hpp, ..\rtmp\srs_protocol_io.hpp,
..\rtmp\srs_protocol_io.cpp, ..\rtmp\srs_protocol_io.cpp,
..\rtmp\srs_protocol_msg_array.hpp,
..\rtmp\srs_protocol_msg_array.cpp,
..\rtmp\srs_protocol_rtmp.hpp, ..\rtmp\srs_protocol_rtmp.hpp,
..\rtmp\srs_protocol_rtmp.cpp, ..\rtmp\srs_protocol_rtmp.cpp,
..\rtmp\srs_protocol_rtmp_stack.hpp, ..\rtmp\srs_protocol_rtmp_stack.hpp,