diff --git a/README.md b/README.md
index 57434e97c..ada1c932b 100755
--- a/README.md
+++ b/README.md
@@ -241,6 +241,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.
## History
+* v1.0, 2014-06-22, performance refine, support 3k+ connections(270kbps). 0.9.130
* v1.0, 2014-06-21, support edge [token traverse](https://github.com/winlinvip/simple-rtmp-server/wiki/DRM#tokentraverse), fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129
* v1.0, 2014-06-19, add connections count to api summaries. 0.9.127
* v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126
diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf
index 15b401484..e2cd62c5c 100644
--- a/trunk/conf/full.conf
+++ b/trunk/conf/full.conf
@@ -36,7 +36,7 @@ srs_log_level trace;
srs_log_file ./objs/srs.log;
# the max connections.
# if exceed the max connections, server will drop the new connection.
-# default: 2000
+# default: 12345
max_connections 1000;
# whether start as deamon
# @remark: donot support reload.
diff --git a/trunk/configure b/trunk/configure
index a680ad704..79d03d3b0 100755
--- a/trunk/configure
+++ b/trunk/configure
@@ -456,7 +456,7 @@ MODULE_ID="RTMP"
MODULE_DEPENDS=("CORE" "KERNEL")
ModuleLibIncs=(${SRS_OBJS} ${LibSSLRoot})
MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_protocol_rtmp_stack" "srs_protocol_rtmp"
- "srs_protocol_handshake" "srs_protocol_utility")
+ "srs_protocol_handshake" "srs_protocol_utility" "srs_protocol_msg_array")
RTMP_INCS="src/rtmp"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh
RTMP_OBJS="${MODULE_OBJS[@]}"
#
diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp
index 307eb4531..c18626d22 100644
--- a/trunk/src/app/srs_app_config.cpp
+++ b/trunk/src/app/srs_app_config.cpp
@@ -1403,7 +1403,7 @@ int SrsConfig::get_max_connections()
SrsConfDirective* conf = root->get("max_connections");
if (!conf || conf->arg0().empty()) {
- return 2000;
+ return SRS_CONF_DEFAULT_MAX_CONNECTIONS;
}
return ::atoi(conf->arg0().c_str());
diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp
index a2d99dd68..b1c74589f 100644
--- a/trunk/src/app/srs_app_config.hpp
+++ b/trunk/src/app/srs_app_config.hpp
@@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_PID_FILE "./objs/srs.pid"
#define SRS_DEFAULT_CONF "conf/srs.conf"
+#define SRS_CONF_DEFAULT_MAX_CONNECTIONS 12345
#define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html"
#define SRS_CONF_DEFAULT_HLS_FRAGMENT 10
#define SRS_CONF_DEFAULT_HLS_WINDOW 60
diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp
index 501f097ab..d934019dc 100644
--- a/trunk/src/app/srs_app_edge.cpp
+++ b/trunk/src/app/srs_app_edge.cpp
@@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
#include
#include
+#include
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
@@ -431,6 +432,7 @@ void SrsEdgeForwarder::stop()
kbps->set_io(NULL, NULL);
}
+#define SYS_MAX_EDGE_SEND_MSGS 128
int SrsEdgeForwarder::cycle()
{
int ret = ERROR_SUCCESS;
@@ -438,6 +440,8 @@ int SrsEdgeForwarder::cycle()
client->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
+
+ SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
while (pthread->can_loop()) {
// switch to other st-threads.
@@ -465,8 +469,7 @@ int SrsEdgeForwarder::cycle()
// forward all messages.
int count = 0;
- SrsSharedPtrMessage** msgs = NULL;
- if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
+ if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward to origin failed. ret=%d", ret);
return ret;
}
@@ -488,16 +491,15 @@ int SrsEdgeForwarder::cycle()
srs_verbose("no packets to forward.");
continue;
}
- SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
// all msgs to forward to origin.
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
- SrsSharedPtrMessage* msg = msgs[i];
+ SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_assert(msg);
- msgs[i] = NULL;
+ msgs.msgs[i] = NULL;
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("edge publish forwarder send message to server failed. ret=%d", ret);
diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp
index a9f71dea5..d4db40e38 100644
--- a/trunk/src/app/srs_app_forward.cpp
+++ b/trunk/src/app/srs_app_forward.cpp
@@ -41,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
#include
#include
+#include
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@@ -309,6 +310,7 @@ int SrsForwarder::connect_server()
return ret;
}
+#define SYS_MAX_FORWARD_SEND_MSGS 128
int SrsForwarder::forward()
{
int ret = ERROR_SUCCESS;
@@ -317,6 +319,8 @@ int SrsForwarder::forward()
SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
+ SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
+
while (pthread->can_loop()) {
// switch to other st-threads.
st_usleep(0);
@@ -339,8 +343,7 @@ int SrsForwarder::forward()
// forward all messages.
int count = 0;
- SrsSharedPtrMessage** msgs = NULL;
- if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
+ if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward failed. ret=%d", ret);
return ret;
}
@@ -360,16 +363,15 @@ int SrsForwarder::forward()
srs_verbose("no packets to forward.");
continue;
}
- SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
// all msgs to forward.
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
- SrsSharedPtrMessage* msg = msgs[i];
+ SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_assert(msg);
- msgs[i] = NULL;
+ msgs.msgs[i] = NULL;
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("forwarder send message to server failed. ret=%d", ret);
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index dbb5f867a..324243d6c 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -50,6 +50,7 @@ using namespace std;
#include
#include
#include
+#include
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
@@ -382,7 +383,7 @@ int SrsRtmpConn::stream_service_cycle()
}
srs_info("start to publish stream %s success", req->stream.c_str());
- ret = fmle_publish(source);
+ ret = fmle_publishing(source);
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
@@ -416,7 +417,7 @@ int SrsRtmpConn::stream_service_cycle()
}
srs_info("flash start to publish stream %s success", req->stream.c_str());
- ret = flash_publish(source);
+ ret = flash_publishing(source);
// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
@@ -476,6 +477,8 @@ int SrsRtmpConn::check_vhost()
return ret;
}
+#define SYS_MAX_PLAY_SEND_MSGS 128
+
int SrsRtmpConn::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
@@ -499,38 +502,43 @@ int SrsRtmpConn::playing(SrsSource* source)
rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER);
+
+ SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS);
+ bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
+
while (true) {
- // switch to other st-threads.
- st_usleep(0);
-
+ // collect elapse for pithy print.
pithy_print.elapse();
// read from client.
if (true) {
SrsMessage* msg = NULL;
ret = rtmp->recv_message(&msg);
-
srs_verbose("play loop recv message. ret=%d", ret);
- if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
- if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
+
+ if (ret == ERROR_SOCKET_TIMEOUT) {
+ // it's ok, do nothing.
+ ret = ERROR_SUCCESS;
+ } else if (ret != ERROR_SUCCESS) {
+ if (!srs_is_client_gracefully_close(ret)) {
srs_error("recv client control message failed. ret=%d", ret);
}
return ret;
- }
- if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
- if (!srs_is_system_control_error(ret)) {
- srs_error("process play control message failed. ret=%d", ret);
+ } else {
+ if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
+ if (!srs_is_system_control_error(ret)) {
+ srs_error("process play control message failed. ret=%d", ret);
+ }
+ return ret;
}
- return ret;
}
}
// get messages from consumer.
- SrsSharedPtrMessage** msgs = NULL;
int count = 0;
- if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
+ if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}
@@ -545,32 +553,29 @@ int SrsRtmpConn::playing(SrsSource* source)
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
}
- if (count <= 0) {
- srs_verbose("no packets in queue.");
- continue;
- }
- SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
-
// sendout messages
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
- SrsSharedPtrMessage* msg = msgs[i];
+ SrsSharedPtrMessage* msg = msgs.msgs[i];
// the send_message will free the msg,
// so set the msgs[i] to NULL.
- msgs[i] = NULL;
+ msgs.msgs[i] = NULL;
- srs_assert(msg);
-
- // foreach msg, collect the duration.
- // @remark: never use msg when sent it, for the protocol sdk will free it.
- if (starttime < 0 || starttime > msg->header.timestamp) {
+ // only when user specifies the duration,
+ // we start to collect the durations for each message.
+ if (user_specified_duration_to_stop) {
+ // foreach msg, collect the duration.
+ // @remark: never use msg when sent it, for the protocol sdk will free it.
+ if (starttime < 0 || starttime > msg->header.timestamp) {
+ starttime = msg->header.timestamp;
+ }
+ duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;
}
- duration += msg->header.timestamp - starttime;
- starttime = msg->header.timestamp;
+ // no need to assert msg, for the rtmp will assert it.
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
srs_error("send message to client failed. ret=%d", ret);
return ret;
@@ -579,17 +584,22 @@ int SrsRtmpConn::playing(SrsSource* source)
// if duration specified, and exceed it, stop play live.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/45
- if (req->duration > 0 && duration >= (int64_t)req->duration) {
- ret = ERROR_RTMP_DURATION_EXCEED;
- srs_trace("stop live for duration exceed. ret=%d", ret);
- return ret;
+ if (user_specified_duration_to_stop) {
+ if (duration >= (int64_t)req->duration) {
+ ret = ERROR_RTMP_DURATION_EXCEED;
+ srs_trace("stop live for duration exceed. ret=%d", ret);
+ return ret;
+ }
}
+
+ // switch to other threads, to anti dead loop.
+ st_usleep(0);
}
return ret;
}
-int SrsRtmpConn::fmle_publish(SrsSource* source)
+int SrsRtmpConn::fmle_publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
@@ -668,7 +678,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
return ret;
}
-int SrsRtmpConn::flash_publish(SrsSource* source)
+int SrsRtmpConn::flash_publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp
index 3fd77a047..fde77f068 100644
--- a/trunk/src/app/srs_app_rtmp_conn.hpp
+++ b/trunk/src/app/srs_app_rtmp_conn.hpp
@@ -48,6 +48,7 @@ class SrsHttpHooks;
class SrsBandwidth;
class SrsKbps;
class SrsRtmpClient;
+class SrsSharedPtrMessage;
/**
* the client provides the main logic control for RTMP clients.
@@ -87,8 +88,8 @@ private:
virtual int stream_service_cycle();
virtual int check_vhost();
virtual int playing(SrsSource* source);
- virtual int fmle_publish(SrsSource* source);
- virtual int flash_publish(SrsSource* source);
+ virtual int fmle_publishing(SrsSource* source);
+ virtual int flash_publishing(SrsSource* source);
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
private:
diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp
index dbcd64aaa..a1b2403cf 100644
--- a/trunk/src/app/srs_app_source.cpp
+++ b/trunk/src/app/srs_app_source.cpp
@@ -136,7 +136,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
- if (msg->header.is_video() || msg->header.is_audio()) {
+ if (msg->header.is_audio() || msg->header.is_video()) {
if (av_start_time == -1) {
av_start_time = msg->header.timestamp;
}
@@ -153,7 +153,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
return ret;
}
-int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
+int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
@@ -161,17 +161,8 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in
return ret;
}
- if (max_count == 0) {
- count = (int)msgs.size();
- } else {
- count = srs_min(max_count, (int)msgs.size());
- }
-
- if (count <= 0) {
- return ret;
- }
-
- pmsgs = new SrsSharedPtrMessage*[count];
+ srs_assert(max_count > 0);
+ count = srs_min(max_count, (int)msgs.size());
for (int i = 0; i < count; i++) {
pmsgs[i] = msgs[i];
@@ -275,11 +266,11 @@ int SrsConsumer::get_time()
return jitter->get_time();
}
-int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
+int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
- if (!source->is_atc()) {
+ if (!atc) {
if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
@@ -293,8 +284,10 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
return ret;
}
-int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
+int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
+ srs_assert(max_count > 0);
+
if (should_update_source_id) {
srs_trace("update source_id=%d", source->source_id());
should_update_source_id = false;
@@ -305,7 +298,7 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
return ERROR_SUCCESS;
}
- return queue->get_packets(max_count, pmsgs, count);
+ return queue->dump_packets(max_count, pmsgs, count);
}
int SrsConsumer::on_play_client_pause(bool is_pause)
@@ -391,14 +384,15 @@ void SrsGopCache::clear()
cached_video_count = 0;
}
-int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
+int SrsGopCache::dump(SrsConsumer* consumer, bool atc, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
std::vector::iterator it;
for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
- if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) {
+ SrsSharedPtrMessage* copy = msg->copy();
+ if ((ret = consumer->enqueue(copy, atc, tba, tbv)) != ERROR_SUCCESS) {
srs_error("dispatch cached gop failed. ret=%d", ret);
return ret;
}
@@ -926,7 +920,8 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
std::vector::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
- if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ SrsSharedPtrMessage* copy = cache_metadata->copy();
+ if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch the metadata failed. ret=%d", ret);
return ret;
}
@@ -987,17 +982,17 @@ int SrsSource::on_audio(SrsMessage* audio)
// copy to all consumer
if (true) {
- std::vector::iterator it;
- for (it = consumers.begin(); it != consumers.end(); ++it) {
- SrsConsumer* consumer = *it;
- if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ for (int i = 0; i < (int)consumers.size(); i++) {
+ SrsConsumer* consumer = consumers.at(i);
+ SrsSharedPtrMessage* copy = msg->copy();
+ if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch the audio failed. ret=%d", ret);
return ret;
}
}
srs_info("dispatch audio success.");
}
-
+
// copy to all forwarders.
if (true) {
std::vector::iterator it;
@@ -1077,10 +1072,10 @@ int SrsSource::on_video(SrsMessage* video)
// copy to all consumer
if (true) {
- std::vector::iterator it;
- for (it = consumers.begin(); it != consumers.end(); ++it) {
- SrsConsumer* consumer = *it;
- if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ for (int i = 0; i < (int)consumers.size(); i++) {
+ SrsConsumer* consumer = consumers.at(i);
+ SrsSharedPtrMessage* copy = msg->copy();
+ if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch the video failed. ret=%d", ret);
return ret;
}
@@ -1327,27 +1322,27 @@ void SrsSource::on_unpublish()
}
// copy metadata.
- if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch metadata failed. ret=%d", ret);
return ret;
}
srs_info("dispatch metadata success");
// copy sequence header
- if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch video sequence header success");
- if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
srs_error("dispatch audio sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch audio sequence header success");
// copy gop cache to client.
- if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) {
+ if ((ret = gop_cache->dump(consumer, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) {
return ret;
}
@@ -1375,11 +1370,6 @@ void SrsSource::set_cache(bool enabled)
gop_cache->set(enabled);
}
-bool SrsSource::is_atc()
-{
- return atc;
-}
-
int SrsSource::on_edge_start_play()
{
return play_edge->on_client_play();
diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp
index fce1c7bc8..e35296d06 100644
--- a/trunk/src/app/srs_app_source.hpp
+++ b/trunk/src/app/srs_app_source.hpp
@@ -110,11 +110,11 @@ public:
virtual int enqueue(SrsSharedPtrMessage* msg);
/**
* get packets in consumer queue.
- * @pmsgs SrsMessages*[], output the prt array.
- * @count the count in array.
- * @max_count the max count to dequeue, 0 to dequeue all.
+ * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
+ * @count the count in array, output param.
+ * @max_count the max count to dequeue, must be positive.
*/
- virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
+ virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
private:
/**
* remove a gop from the front.
@@ -155,19 +155,20 @@ public:
virtual int get_time();
/**
* enqueue an shared ptr message.
+ * @param whether atc, donot use jitter correct if true.
* @param tba timebase of audio.
* used to calc the audio time delta if time-jitter detected.
* @param tbv timebase of video.
* used to calc the video time delta if time-jitter detected.
*/
- virtual int enqueue(SrsSharedPtrMessage* msg, int tba, int tbv);
+ virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv);
/**
* get packets in consumer queue.
- * @pmsgs SrsMessages*[], output the prt array.
- * @count the count in array.
- * @max_count the max count to dequeue, 0 to dequeue all.
+ * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
+ * @count the count in array, output param.
+ * @max_count the max count to dequeue, must be positive.
*/
- virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
+ virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
/**
* when client send the pause message.
*/
@@ -208,7 +209,7 @@ public:
*/
virtual int cache(SrsSharedPtrMessage* msg);
virtual void clear();
- virtual int dump(SrsConsumer* consumer, int tba, int tbv);
+ virtual int dump(SrsConsumer* consumer, bool atc, int tba, int tbv);
/**
* used for atc to get the time of gop cache,
* the atc will adjust the sequence header timestamp to gop cache.
@@ -346,8 +347,6 @@ public:
virtual void set_cache(bool enabled);
// internal
public:
- // for consumer, atc feature.
- virtual bool is_atc();
// for edge, when play edge stream, check the state
virtual int on_edge_start_play();
// for edge, when publish edge stream, check the state
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index 7870166f6..62fcb68df 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR "0"
#define VERSION_MINOR "9"
-#define VERSION_REVISION "129"
+#define VERSION_REVISION "130"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp
index 2de455fd1..d021e0735 100644
--- a/trunk/src/core/srs_core_autofree.hpp
+++ b/trunk/src/core/srs_core_autofree.hpp
@@ -66,53 +66,4 @@ public:
}
};
-/**
-* auto free the array ptrs, for example, MyClass* msgs[10],
-* which stores 10 MyClass* objects, this class will:
-* 1. free each MyClass* in array.
-* 2. free the msgs itself.
-* 3. set msgs to NULL.
-* @remark, MyClass* msgs[] equals to MyClass**, the ptr array equals ptr to ptr.
-* Usage:
-* MyClass* msgs[10];
-* // ...... use msgs.
-* SrsAutoFreeArray(MyClass, msgs, 10);
-*/
-#define SrsAutoFreeArray(className, instance, size) \
- __SrsAutoFreeArray _auto_free_array_##instance(&instance, size)
-template
-class __SrsAutoFreeArray
-{
-private:
- T*** ptr;
- int size;
-public:
- /**
- * auto delete the ptr array.
- */
- __SrsAutoFreeArray(T*** _ptr, int _size) {
- ptr = _ptr;
- size = _size;
- }
-
- virtual ~__SrsAutoFreeArray() {
- if (ptr == NULL || *ptr == NULL) {
- return;
- }
-
- T** arr = *ptr;
- for (int i = 0; i < size; i++) {
- T* pobj = arr[i];
- if (pobj) {
- delete pobj;
- arr[i] = NULL;
- }
- }
-
- delete arr;
-
- *ptr = NULL;
- }
-};
-
#endif
\ No newline at end of file
diff --git a/trunk/src/rtmp/srs_protocol_msg_array.cpp b/trunk/src/rtmp/srs_protocol_msg_array.cpp
new file mode 100644
index 000000000..e9dc40241
--- /dev/null
+++ b/trunk/src/rtmp/srs_protocol_msg_array.cpp
@@ -0,0 +1,51 @@
+/*
+The MIT License (MIT)
+
+Copyright (c) 2013-2014 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#include
+
+#include
+
+SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size)
+{
+ srs_assert(_size > 0);
+
+ msgs = new SrsSharedPtrMessage*[_size];
+ size = _size;
+
+ // initialize
+ for (int i = 0; i < _size; i++) {
+ msgs[i] = NULL;
+ }
+}
+
+SrsSharedPtrMessageArray::~SrsSharedPtrMessageArray()
+{
+ // cleanup
+ for (int i = 0; i < size; i++) {
+ SrsSharedPtrMessage* msg = msgs[i];
+ srs_freep(msg);
+ }
+
+ srs_freep(msgs);
+}
+
diff --git a/trunk/src/rtmp/srs_protocol_msg_array.hpp b/trunk/src/rtmp/srs_protocol_msg_array.hpp
new file mode 100644
index 000000000..5366bdbe5
--- /dev/null
+++ b/trunk/src/rtmp/srs_protocol_msg_array.hpp
@@ -0,0 +1,53 @@
+/*
+The MIT License (MIT)
+
+Copyright (c) 2013-2014 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#ifndef SRS_RTMP_PROTOCOL_MSG_ARRAY_HPP
+#define SRS_RTMP_PROTOCOL_MSG_ARRAY_HPP
+
+/*
+#include
+*/
+
+#include
+
+class SrsSharedPtrMessage;
+
+/**
+* the class to auto free the shared ptr message array.
+*/
+class SrsSharedPtrMessageArray
+{
+public:
+ /**
+ * when user already send the msg in msgs, please set to NULL,
+ * for instance, msg= msgs.msgs[i], msgs.msgs[i]=NULL, send(msg),
+ * where send(msg) will always send and free it.
+ */
+ SrsSharedPtrMessage** msgs;
+ int size;
+public:
+ SrsSharedPtrMessageArray(int _size);
+ virtual ~SrsSharedPtrMessageArray();
+};
+
+#endif
diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
index ebba78217..4cb503771 100644
--- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
+++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
@@ -437,13 +437,12 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket)
return ret;
}
-int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet)
+int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet)
{
int ret = ERROR_SUCCESS;
- // always free msg.
+ // always not NULL msg.
srs_assert(msg);
- SrsAutoFree(SrsMessage, msg);
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
@@ -497,7 +496,7 @@ int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet)
*pheader++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian
- if(timestamp >= RTMP_EXTENDED_TIMESTAMP){
+ if(timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)×tamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
@@ -522,7 +521,7 @@ int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet)
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
u_int32_t timestamp = (u_int32_t)msg->header.timestamp;
- if(timestamp >= RTMP_EXTENDED_TIMESTAMP){
+ if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)×tamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
@@ -733,7 +732,12 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
if (msg) {
msg->header.stream_id = stream_id;
}
- return do_send_and_free_message(msg, NULL);
+
+ // donot use the auto free to free the msg,
+ // for performance issue.
+ int ret = do_send_message(msg, NULL);
+ srs_freep(msg);
+ return ret;
}
int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
@@ -767,9 +771,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
msg->header.stream_id = stream_id;
msg->header.perfer_cid = packet->get_perfer_cid();
- if ((ret = do_send_and_free_message(msg, packet)) != ERROR_SUCCESS) {
- return ret;
- }
+ // donot use the auto free to free the msg,
+ // for performance issue.
+ ret = do_send_message(msg, packet);
+ srs_freep(msg);
return ret;
}
diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
index ebdc57581..928660207 100644
--- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
+++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
@@ -174,10 +174,10 @@ public:
virtual int send_and_free_packet(SrsPacket* packet, int stream_id);
private:
/**
- * imp for send_and_free_message
+ * send out the message, donot free it, the caller must free the param msg.
* @param packet the packet of message, NULL for raw message.
*/
- virtual int do_send_and_free_message(SrsMessage* msg, SrsPacket* packet);
+ virtual int do_send_message(SrsMessage* msg, SrsPacket* packet);
/**
* imp for decode_message
*/
diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp
index e30f5a8fb..de1f2c440 100755
--- a/trunk/src/srs/srs.upp
+++ b/trunk/src/srs/srs.upp
@@ -36,6 +36,8 @@ file
..\rtmp\srs_protocol_handshake.cpp,
..\rtmp\srs_protocol_io.hpp,
..\rtmp\srs_protocol_io.cpp,
+ ..\rtmp\srs_protocol_msg_array.hpp,
+ ..\rtmp\srs_protocol_msg_array.cpp,
..\rtmp\srs_protocol_rtmp.hpp,
..\rtmp\srs_protocol_rtmp.cpp,
..\rtmp\srs_protocol_rtmp_stack.hpp,