From 5aa31568baa04c07f4075c1256e9433d79323a6c Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 7 Jan 2015 15:35:01 +0800 Subject: [PATCH] fix the rtmp dump, parse the aggregate msg. --- trunk/research/librtmp/srs_rtmp_dump.c | 14 +- trunk/src/libs/srs_librtmp.cpp | 210 +++++++++++++++++++++---- 2 files changed, 193 insertions(+), 31 deletions(-) diff --git a/trunk/research/librtmp/srs_rtmp_dump.c b/trunk/research/librtmp/srs_rtmp_dump.c index ea8c8f24f..55504a09a 100644 --- a/trunk/research/librtmp/srs_rtmp_dump.c +++ b/trunk/research/librtmp/srs_rtmp_dump.c @@ -272,10 +272,18 @@ int main(int argc, char** argv) goto rtmp_destroy; } + // we only write some types of messages to flv file. + int is_flv_msg = type == SRS_RTMP_TYPE_AUDIO + || type == SRS_RTMP_TYPE_VIDEO || type == SRS_RTMP_TYPE_SCRIPT; + if (flv) { - if (srs_flv_write_tag(flv, type, timestamp, data, size) != 0) { - srs_human_trace("dump rtmp packet failed."); - goto rtmp_destroy; + if (is_flv_msg) { + if (srs_flv_write_tag(flv, type, timestamp, data, size) != 0) { + srs_human_trace("dump rtmp packet failed."); + goto rtmp_destroy; + } + } else { + srs_human_trace("drop message size=%dB", size); } } diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 0a85bd170..5a05f8c99 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -70,6 +70,12 @@ struct Context // extra request object for connect to server, NULL to ignore. SrsRequest* req; + // the message received cache, + // for example, when got aggregate message, + // the context will parse to videos/audios, + // and return one by one. + std::vector msgs; + SrsRtmpClient* rtmp; SimpleSocketStream* skt; int stream_id; @@ -106,6 +112,13 @@ struct Context srs_freep(req); srs_freep(rtmp); srs_freep(skt); + + std::vector::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + SrsCommonMessage* msg = *it; + srs_freep(msg); + } + msgs.clear(); } }; @@ -799,6 +812,156 @@ int srs_rtmp_bandwidth_check(srs_rtmp_t rtmp, return ret; } + +int __srs_rtmp_on_aggregate(Context* context, SrsCommonMessage* msg) +{ + int ret = ERROR_SUCCESS; + + SrsStream aggregate_stream; + SrsStream* stream = &aggregate_stream; + if ((ret = stream->initialize(msg->payload, msg->size)) != ERROR_SUCCESS) { + return ret; + } + + int delta = -1; + while (!stream->empty()) { + if (!stream->require(1)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message type. ret=%d", ret); + return ret; + } + int8_t type = stream->read_1bytes(); + + if (!stream->require(3)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message size. ret=%d", ret); + return ret; + } + int32_t data_size = stream->read_3bytes(); + + if (data_size < 0) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message size(negative). ret=%d", ret); + return ret; + } + + if (!stream->require(3)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message time. ret=%d", ret); + return ret; + } + int32_t timestamp = stream->read_3bytes(); + + if (!stream->require(1)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message time(high). ret=%d", ret); + return ret; + } + int32_t time_h = stream->read_1bytes(); + + timestamp |= time_h<<24; + timestamp &= 0x7FFFFFFF; + + // adjust timestamp. + if (delta < 0) { + delta = (int)msg->header.timestamp - (int)timestamp; + } + timestamp += delta; + + if (!stream->require(3)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message stream_id. ret=%d", ret); + return ret; + } + int32_t stream_id = stream->read_3bytes(); + + if (data_size > 0 && !stream->require(data_size)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message data. ret=%d", ret); + return ret; + } + + // to common message. + SrsCommonMessage __o; + SrsCommonMessage& o = __o; + + o.header.message_type = type; + o.header.payload_length = data_size; + o.header.timestamp_delta = timestamp; + o.header.timestamp = timestamp; + o.header.stream_id = stream_id; + o.header.perfer_cid = msg->header.perfer_cid; + + if (data_size > 0) { + o.size = data_size; + o.payload = new char[o.size]; + stream->read_bytes(o.payload, o.size); + } + + if (!stream->require(4)) { + ret = ERROR_RTMP_AGGREGATE; + srs_error("invalid aggregate message previous tag size. ret=%d", ret); + return ret; + } + stream->read_4bytes(); + + // process parsed message + SrsCommonMessage* parsed_msg = new SrsCommonMessage(); + parsed_msg->header = o.header; + parsed_msg->payload = o.payload; + parsed_msg->size = o.size; + o.payload = NULL; + context->msgs.push_back(parsed_msg); + } + + return ret; +} + +int __srs_rtmp_go_packet(Context* context, SrsCommonMessage* msg, + char* type, u_int32_t* timestamp, char** data, int* size, + bool* got_msg +) { + int ret = ERROR_SUCCESS; + + // generally we got a message. + *got_msg = true; + + if (msg->header.is_audio()) { + *type = SRS_RTMP_TYPE_AUDIO; + *timestamp = (u_int32_t)msg->header.timestamp; + *data = (char*)msg->payload; + *size = (int)msg->size; + // detach bytes from packet. + msg->payload = NULL; + } else if (msg->header.is_video()) { + *type = SRS_RTMP_TYPE_VIDEO; + *timestamp = (u_int32_t)msg->header.timestamp; + *data = (char*)msg->payload; + *size = (int)msg->size; + // detach bytes from packet. + msg->payload = NULL; + } else if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { + *type = SRS_RTMP_TYPE_SCRIPT; + *data = (char*)msg->payload; + *size = (int)msg->size; + // detach bytes from packet. + msg->payload = NULL; + } else if (msg->header.is_aggregate()) { + if ((ret = __srs_rtmp_on_aggregate(context, msg)) != ERROR_SUCCESS) { + return ret; + } + *got_msg = false; + } else { + *type = msg->header.message_type; + *data = (char*)msg->payload; + *size = (int)msg->size; + // detach bytes from packet. + msg->payload = NULL; + } + + return ret; +} + int srs_rtmp_read_packet(srs_rtmp_t rtmp, char* type, u_int32_t* timestamp, char** data, int* size) { *type = 0; @@ -813,45 +976,36 @@ int srs_rtmp_read_packet(srs_rtmp_t rtmp, char* type, u_int32_t* timestamp, char for (;;) { SrsCommonMessage* msg = NULL; - if ((ret = context->rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + + // read from cache first. + if (!context->msgs.empty()) { + std::vector::iterator it = context->msgs.begin(); + msg = *it; + context->msgs.erase(it); + } + + // read from protocol sdk. + if (!msg && (ret = context->rtmp->recv_message(&msg)) != ERROR_SUCCESS) { return ret; } + + // no msg, try again. if (!msg) { continue; } SrsAutoFree(SrsCommonMessage, msg); - if (msg->header.is_audio()) { - *type = SRS_RTMP_TYPE_AUDIO; - *timestamp = (u_int32_t)msg->header.timestamp; - *data = (char*)msg->payload; - *size = (int)msg->size; - // detach bytes from packet. - msg->payload = NULL; - } else if (msg->header.is_video()) { - *type = SRS_RTMP_TYPE_VIDEO; - *timestamp = (u_int32_t)msg->header.timestamp; - *data = (char*)msg->payload; - *size = (int)msg->size; - // detach bytes from packet. - msg->payload = NULL; - } else if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { - *type = SRS_RTMP_TYPE_SCRIPT; - *data = (char*)msg->payload; - *size = (int)msg->size; - // detach bytes from packet. - msg->payload = NULL; - } else { - *type = msg->header.message_type; - *data = (char*)msg->payload; - *size = (int)msg->size; - // detach bytes from packet. - msg->payload = NULL; + // process the got packet, if nothing, try again. + bool got_msg; + if ((ret = __srs_rtmp_go_packet(context, msg, type, timestamp, data, size, &got_msg)) != ERROR_SUCCESS) { + return ret; } // got expected message. - break; + if (got_msg) { + break; + } } return ret;