1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

add miss files. fix bug #60: support aggregate message

This commit is contained in:
winlin 2014-05-08 14:33:25 +08:00
parent f6dd1371bf
commit 8388da6325
12 changed files with 247 additions and 3 deletions

View file

@ -214,6 +214,15 @@ int SrsEdgeIngester::process_publish_message(SrsMessage* msg)
return ret;
}
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
srs_error("source process aggregate message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {

View file

@ -724,6 +724,15 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsMessage* msg, boo
return ret;
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
srs_error("source process aggregate message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;

View file

@ -469,6 +469,7 @@ SrsSource::SrsSource(SrsRequest* req)
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
aggregate_stream = new SrsStream();
_srs_config->subscribe(this);
atc = _srs_config->get_atc(_req->vhost);
@ -498,6 +499,7 @@ SrsSource::~SrsSource()
srs_freep(play_edge);
srs_freep(publish_edge);
srs_freep(gop_cache);
srs_freep(aggregate_stream);
#ifdef SRS_AUTO_HLS
srs_freep(hls);
@ -1069,6 +1071,105 @@ int SrsSource::on_video(SrsMessage* video)
return ret;
}
int SrsSource::on_aggregate(SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
SrsStream* stream = aggregate_stream;
if ((ret = stream->initialize((char*)msg->payload, msg->size)) != ERROR_SUCCESS) {
return ret;
}
while (!stream->empty()) {
if (!stream->require(1)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message type. ret=%d", ret);
return ret;
}
int8_t type = stream->read_1bytes();
if (!stream->require(3)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message size. ret=%d", ret);
return ret;
}
int32_t data_size = stream->read_3bytes();
if (data_size < 0) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message size(negative). ret=%d", ret);
return ret;
}
if (!stream->require(3)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message time. ret=%d", ret);
return ret;
}
int32_t timestamp = stream->read_3bytes();
if (!stream->require(1)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message time(high). ret=%d", ret);
return ret;
}
int32_t time_h = stream->read_1bytes();
timestamp |= time_h<<24;
timestamp &= 0x7FFFFFFF;
if (!stream->require(3)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message stream_id. ret=%d", ret);
return ret;
}
int32_t stream_id = stream->read_3bytes();
if (data_size > 0 && !stream->require(data_size)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message data. ret=%d", ret);
return ret;
}
// to common message.
SrsCommonMessage __o;
SrsMessage& o = __o;
o.header.message_type = type;
o.header.payload_length = data_size;
o.header.timestamp_delta = timestamp;
o.header.timestamp = timestamp;
o.header.stream_id = stream_id;
o.header.perfer_cid = msg->header.perfer_cid;
if (data_size > 0) {
o.size = data_size;
o.payload = new int8_t[o.size];
stream->read_bytes((char*)o.payload, o.size);
}
if (!stream->require(4)) {
ret = ERROR_RTMP_AGGREGATE;
srs_error("invalid aggregate message previous tag size. ret=%d", ret);
return ret;
}
stream->read_4bytes();
// process parsed message
if (o.header.is_audio()) {
if ((ret = on_audio(&o)) != ERROR_SUCCESS) {
return ret;
}
} else if (o.header.is_video()) {
if ((ret = on_video(&o)) != ERROR_SUCCESS) {
return ret;
}
}
}
return ret;
}
int SrsSource::on_publish()
{
int ret = ERROR_SUCCESS;

View file

@ -57,6 +57,7 @@ class SrsDvr;
#ifdef SRS_AUTO_TRANSCODE
class SrsEncoder;
#endif
class SrsStream;
/**
* time jitter detect and correct,
@ -251,6 +252,8 @@ private:
SrsGopCache* gop_cache;
// to forward stream to other servers
std::vector<SrsForwarder*> forwarders;
// for aggregate message
SrsStream* aggregate_stream;
private:
/**
* the sample rate of audio in metadata.
@ -307,6 +310,7 @@ public:
virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsMessage* audio);
virtual int on_video(SrsMessage* video);
virtual int on_aggregate(SrsMessage* msg);
/**
* publish stream event notify.
* @param _req the request from client, the source will deep copy it,

View file

@ -83,6 +83,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_RTMP_EDGE_PUBLISH_STATE 321
#define ERROR_RTMP_EDGE_PROXY_PULL 322
#define ERROR_RTMP_EDGE_RELOAD 323
// aggregate message parse failed.
#define ERROR_RTMP_AGGREGATE 324
#define ERROR_SYSTEM_STREAM_INIT 400
#define ERROR_SYSTEM_PACKET_INVALID 401

View file

@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_stream.hpp>
using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
@ -160,7 +162,7 @@ int64_t SrsStream::read_8bytes()
return value;
}
std::string SrsStream::read_string(int len)
string SrsStream::read_string(int len)
{
srs_assert(require(len));
@ -172,6 +174,15 @@ std::string SrsStream::read_string(int len)
return value;
}
void SrsStream::read_bytes(char* data, int size)
{
srs_assert(require(size));
memcpy(data, p, size);
p += size;
}
void SrsStream::write_1bytes(int8_t value)
{
srs_assert(require(1));
@ -224,7 +235,7 @@ void SrsStream::write_8bytes(int64_t value)
*p++ = pp[0];
}
void SrsStream::write_string(std::string value)
void SrsStream::write_string(string value)
{
srs_assert(require(value.length()));

View file

@ -104,6 +104,10 @@ public:
* get string from stream, length specifies by param len.
*/
virtual std::string read_string(int len);
/**
* get bytes from stream, length specifies by param len.
*/
virtual void read_bytes(char* data, int size);
public:
/**
* write 1bytes char to stream.

View file

@ -1477,6 +1477,11 @@ bool SrsMessageHeader::is_user_control_message()
return message_type == RTMP_MSG_UserControlMessage;
}
bool SrsMessageHeader::is_aggregate()
{
return message_type == RTMP_MSG_AggregateMessage;
}
void SrsMessageHeader::initialize_amf0_script(int size, int stream)
{
message_type = RTMP_MSG_AMF0DataMessage;

View file

@ -277,6 +277,7 @@ public:
bool is_ackledgement();
bool is_set_chunk_size();
bool is_user_control_message();
bool is_aggregate();
void initialize_amf0_script(int size, int stream);
void initialize_audio(int size, u_int32_t time, int stream);