diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh
index ca2fb034a..ca71172be 100755
--- a/trunk/auto/depends.sh
+++ b/trunk/auto/depends.sh
@@ -690,4 +690,4 @@ echo "" >> $SRS_AUTO_HEADERS_H
#####################################################################################
# generated the test script
#####################################################################################
-rm -rf ${SRS_OBJS}/srs.test && ln -sf `pwd`/scripts/test_configure.sh objs/srs.test
+rm -rf ${SRS_OBJS}/srs.test && ln -sf `pwd`/scripts/srs.test objs/srs.test
diff --git a/trunk/research/code-statistic/csr.py b/trunk/research/code-statistic/csr.py
new file mode 100755
index 000000000..e1877a69f
--- /dev/null
+++ b/trunk/research/code-statistic/csr.py
@@ -0,0 +1,93 @@
+#!/usr/bin/python
+'''
+The MIT License (MIT)
+
+Copyright (c) 2013-2014 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+'''
+
+#################################################################################
+# to stat the code and comments lines
+#################################################################################
+import sys, os, cs
+from cs import info, trace
+
+if __name__ != "__main__":
+ print "donot support lib"
+ sys.exit(-1)
+
+filters="*.*pp,*.h,*.c,*.cc"
+except_filters="utest,doc"
+if len(sys.argv) <= 1:
+ print "to stat the code and comments lines"
+ print "Usage: python %s
[filters] [except_filters]"%(sys.argv[0])
+ print " dir: the dir contains the files to stat"
+ print " filters: the file filters, default: *.*pp,*.h,*.c,*.cc"
+ print " filters: the except file filters, default: utest,doc"
+ print "Example:"
+ print " python %s src"%(sys.argv[0])
+ print " python %s src *.*pp,*.cc utest"%(sys.argv[0])
+ sys.exit(-1)
+
+dir = sys.argv[1]
+if len(sys.argv) > 2:
+ filters = sys.argv[2]
+if len(sys.argv) > 3:
+ except_filters = sys.argv[3]
+info("stat dir:%s, filters:%s, except_filters:%s"%(dir, filters, except_filters))
+
+# filters to array
+filters = filters.split(",")
+except_filters = except_filters.split(",")
+
+# find src -name "*.*pp"|grep -v utest
+(totals, stat_codes, commentss, stat_block_commentss, stat_line_commentss) = (0, 0, 0, 0, 0)
+for filter in filters:
+ cmd = 'find %s -name "%s"'%(dir, filter)
+ for ef in except_filters:
+ cmd = '%s|%s'%(cmd, 'grep -v "%s"'%(ef))
+ cmd = "%s 2>&1"%(cmd)
+ info("scan dir, cmd:%s"%cmd)
+
+ pipe = os.popen(cmd)
+ files = pipe.read()
+ info("scan dir, files:%s"%files)
+ pipe.close()
+
+ files = files.split("\n")
+ for file in files:
+ file = file.strip()
+ if len(file) == 0:
+ continue;
+ info("start stat file:%s"%file)
+ (code, total, stat_code, comments, stat_block_comments, stat_line_comments, code_file) = cs.do_stat(file)
+ if code != 0:
+ continue;
+ totals += total
+ stat_codes += stat_code
+ commentss += comments
+ stat_block_commentss += stat_block_comments
+ stat_line_commentss += stat_line_comments
+
+if totals == 0:
+ trace("no code or comments found.")
+else:
+ trace("total:%s code:%s comments:%s(%.2f%%) block:%s line:%s"%(
+ totals, stat_codes, commentss, commentss * 100.0 / totals, stat_block_commentss, stat_line_commentss
+ ))
diff --git a/trunk/scripts/srs.test b/trunk/scripts/srs.test
new file mode 100755
index 000000000..eda09c6cf
--- /dev/null
+++ b/trunk/scripts/srs.test
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+bash scripts/test_configure.sh && ./objs/srs_utest
+ret=$?; if [[ 0 -ne $ret ]]; then echo "configure test and utest failed."; exit $ret; fi
+echo "configure test and utest success";
diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp
index b6c56bef4..42303bbc7 100644
--- a/trunk/src/app/srs_app_edge.cpp
+++ b/trunk/src/app/srs_app_edge.cpp
@@ -214,6 +214,15 @@ int SrsEdgeIngester::process_publish_message(SrsMessage* msg)
return ret;
}
}
+
+ // process aggregate packet
+ if (msg->header.is_aggregate()) {
+ if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
+ srs_error("source process aggregate message failed. ret=%d", ret);
+ return ret;
+ }
+ return ret;
+ }
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index 78313fc87..1059eae06 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -724,6 +724,15 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsMessage* msg, boo
return ret;
}
+ // process aggregate packet
+ if (msg->header.is_aggregate()) {
+ if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
+ srs_error("source process aggregate message failed. ret=%d", ret);
+ return ret;
+ }
+ return ret;
+ }
+
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp
index 5c9aeca1e..57236b501 100644
--- a/trunk/src/app/srs_app_source.cpp
+++ b/trunk/src/app/srs_app_source.cpp
@@ -469,6 +469,7 @@ SrsSource::SrsSource(SrsRequest* req)
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
+ aggregate_stream = new SrsStream();
_srs_config->subscribe(this);
atc = _srs_config->get_atc(_req->vhost);
@@ -498,6 +499,7 @@ SrsSource::~SrsSource()
srs_freep(play_edge);
srs_freep(publish_edge);
srs_freep(gop_cache);
+ srs_freep(aggregate_stream);
#ifdef SRS_AUTO_HLS
srs_freep(hls);
@@ -1069,6 +1071,105 @@ int SrsSource::on_video(SrsMessage* video)
return ret;
}
+int SrsSource::on_aggregate(SrsMessage* msg)
+{
+ int ret = ERROR_SUCCESS;
+
+ SrsStream* stream = aggregate_stream;
+ if ((ret = stream->initialize((char*)msg->payload, msg->size)) != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ while (!stream->empty()) {
+ if (!stream->require(1)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message type. ret=%d", ret);
+ return ret;
+ }
+ int8_t type = stream->read_1bytes();
+
+ if (!stream->require(3)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message size. ret=%d", ret);
+ return ret;
+ }
+ int32_t data_size = stream->read_3bytes();
+
+ if (data_size < 0) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message size(negative). ret=%d", ret);
+ return ret;
+ }
+
+ if (!stream->require(3)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message time. ret=%d", ret);
+ return ret;
+ }
+ int32_t timestamp = stream->read_3bytes();
+
+ if (!stream->require(1)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message time(high). ret=%d", ret);
+ return ret;
+ }
+ int32_t time_h = stream->read_1bytes();
+
+ timestamp |= time_h<<24;
+ timestamp &= 0x7FFFFFFF;
+
+ if (!stream->require(3)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message stream_id. ret=%d", ret);
+ return ret;
+ }
+ int32_t stream_id = stream->read_3bytes();
+
+ if (data_size > 0 && !stream->require(data_size)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message data. ret=%d", ret);
+ return ret;
+ }
+
+ // to common message.
+ SrsCommonMessage __o;
+ SrsMessage& o = __o;
+
+ o.header.message_type = type;
+ o.header.payload_length = data_size;
+ o.header.timestamp_delta = timestamp;
+ o.header.timestamp = timestamp;
+ o.header.stream_id = stream_id;
+ o.header.perfer_cid = msg->header.perfer_cid;
+
+ if (data_size > 0) {
+ o.size = data_size;
+ o.payload = new int8_t[o.size];
+ stream->read_bytes((char*)o.payload, o.size);
+ }
+
+ if (!stream->require(4)) {
+ ret = ERROR_RTMP_AGGREGATE;
+ srs_error("invalid aggregate message previous tag size. ret=%d", ret);
+ return ret;
+ }
+ stream->read_4bytes();
+
+ // process parsed message
+ if (o.header.is_audio()) {
+ if ((ret = on_audio(&o)) != ERROR_SUCCESS) {
+ return ret;
+ }
+ } else if (o.header.is_video()) {
+ if ((ret = on_video(&o)) != ERROR_SUCCESS) {
+ return ret;
+ }
+ }
+ }
+
+ return ret;
+}
+
int SrsSource::on_publish()
{
int ret = ERROR_SUCCESS;
diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp
index 914c4335f..b9bfa248f 100644
--- a/trunk/src/app/srs_app_source.hpp
+++ b/trunk/src/app/srs_app_source.hpp
@@ -57,6 +57,7 @@ class SrsDvr;
#ifdef SRS_AUTO_TRANSCODE
class SrsEncoder;
#endif
+class SrsStream;
/**
* time jitter detect and correct,
@@ -251,6 +252,8 @@ private:
SrsGopCache* gop_cache;
// to forward stream to other servers
std::vector forwarders;
+ // for aggregate message
+ SrsStream* aggregate_stream;
private:
/**
* the sample rate of audio in metadata.
@@ -307,6 +310,7 @@ public:
virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsMessage* audio);
virtual int on_video(SrsMessage* video);
+ virtual int on_aggregate(SrsMessage* msg);
/**
* publish stream event notify.
* @param _req the request from client, the source will deep copy it,
diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp
index fd8c5dbe9..4279b6bc8 100644
--- a/trunk/src/kernel/srs_kernel_error.hpp
+++ b/trunk/src/kernel/srs_kernel_error.hpp
@@ -83,6 +83,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_RTMP_EDGE_PUBLISH_STATE 321
#define ERROR_RTMP_EDGE_PROXY_PULL 322
#define ERROR_RTMP_EDGE_RELOAD 323
+// aggregate message parse failed.
+#define ERROR_RTMP_AGGREGATE 324
#define ERROR_SYSTEM_STREAM_INIT 400
#define ERROR_SYSTEM_PACKET_INVALID 401
diff --git a/trunk/src/kernel/srs_kernel_stream.cpp b/trunk/src/kernel/srs_kernel_stream.cpp
index d32f3c9a6..a58619528 100644
--- a/trunk/src/kernel/srs_kernel_stream.cpp
+++ b/trunk/src/kernel/srs_kernel_stream.cpp
@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
+using namespace std;
+
#include
#include
@@ -160,7 +162,7 @@ int64_t SrsStream::read_8bytes()
return value;
}
-std::string SrsStream::read_string(int len)
+string SrsStream::read_string(int len)
{
srs_assert(require(len));
@@ -172,6 +174,15 @@ std::string SrsStream::read_string(int len)
return value;
}
+void SrsStream::read_bytes(char* data, int size)
+{
+ srs_assert(require(size));
+
+ memcpy(data, p, size);
+
+ p += size;
+}
+
void SrsStream::write_1bytes(int8_t value)
{
srs_assert(require(1));
@@ -224,7 +235,7 @@ void SrsStream::write_8bytes(int64_t value)
*p++ = pp[0];
}
-void SrsStream::write_string(std::string value)
+void SrsStream::write_string(string value)
{
srs_assert(require(value.length()));
diff --git a/trunk/src/kernel/srs_kernel_stream.hpp b/trunk/src/kernel/srs_kernel_stream.hpp
index aace59d7b..4f36b8f31 100644
--- a/trunk/src/kernel/srs_kernel_stream.hpp
+++ b/trunk/src/kernel/srs_kernel_stream.hpp
@@ -104,6 +104,10 @@ public:
* get string from stream, length specifies by param len.
*/
virtual std::string read_string(int len);
+ /**
+ * get bytes from stream, length specifies by param len.
+ */
+ virtual void read_bytes(char* data, int size);
public:
/**
* write 1bytes char to stream.
diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
index 5a47b6336..4ed78a435 100644
--- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
+++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp
@@ -1477,6 +1477,11 @@ bool SrsMessageHeader::is_user_control_message()
return message_type == RTMP_MSG_UserControlMessage;
}
+bool SrsMessageHeader::is_aggregate()
+{
+ return message_type == RTMP_MSG_AggregateMessage;
+}
+
void SrsMessageHeader::initialize_amf0_script(int size, int stream)
{
message_type = RTMP_MSG_AMF0DataMessage;
diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
index 710198683..8f688faa7 100644
--- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
+++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp
@@ -277,6 +277,7 @@ public:
bool is_ackledgement();
bool is_set_chunk_size();
bool is_user_control_message();
+ bool is_aggregate();
void initialize_amf0_script(int size, int stream);
void initialize_audio(int size, u_int32_t time, int stream);