diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 233655be0..6f546d93d 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -311,6 +311,12 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) continue; } + if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) { + srs_error("update context when received msg. ret=%d", ret); + delete msg; + return ret; + } + srs_verbose("get a msg with raw/undecoded payload"); *pmsg = msg; break; @@ -431,6 +437,35 @@ int SrsProtocol::send_message(SrsMessage* msg) return ret; } +int SrsProtocol::on_recv_message(SrsMessage* msg) +{ + int ret = ERROR_SUCCESS; + + srs_assert(msg != NULL); + + switch (msg->header.message_type) { + case RTMP_MSG_WindowAcknowledgementSize: + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + srs_error("decode packet from message payload failed. ret=%d", ret); + return ret; + } + srs_verbose("decode packet from message payload success."); + break; + } + + switch (msg->header.message_type) { + case RTMP_MSG_WindowAcknowledgementSize: { + SrsSetWindowAckSizePacket* pkt = dynamic_cast(msg->get_packet()); + srs_assert(pkt != NULL); + // TODO: take effect. + srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); + break; + } + } + + return ret; +} + int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) { int ret = ERROR_SUCCESS; @@ -816,22 +851,30 @@ int SrsMessage::decode_packet() srs_assert(payload != NULL); srs_assert(size > 0); + if (packet) { + srs_verbose("msg already decoded"); + return ret; + } + if (!stream) { srs_verbose("create decode stream for message."); stream = new SrsStream(); } + // initialize the decode stream for all message, + // it's ok for the initialize if fast and without memory copy. + if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) { + srs_error("initialize stream failed. ret=%d", ret); + return ret; + } + srs_verbose("decode stream initialized success"); + + // decode specified packet type if (header.message_type == RTMP_MSG_AMF0CommandMessage) { srs_verbose("start to decode AMF0 command message."); // amf0 command message. // need to read the command name. - if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) { - srs_error("initialize stream failed. ret=%d", ret); - return ret; - } - srs_verbose("decode stream initialized success"); - std::string command; if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { srs_error("decode AMF0 command name failed. ret=%d", ret); @@ -850,12 +893,16 @@ int SrsMessage::decode_packet() srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); packet = new SrsPacket(); return ret; + } else if(header.message_type == RTMP_MSG_WindowAcknowledgementSize) { + srs_verbose("start to decode set ack window size message."); + packet = new SrsSetWindowAckSizePacket(); + return packet->decode(stream); + } else { + // default packet to drop message. + srs_trace("drop the unknown message, type=%d", header.message_type); + packet = new SrsPacket(); } - // default packet to drop message. - srs_trace("drop the unknown message, type=%d", header.message_type); - packet = new SrsPacket(); - return ret; } @@ -1137,6 +1184,22 @@ SrsSetWindowAckSizePacket::~SrsSetWindowAckSizePacket() { } +int SrsSetWindowAckSizePacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(4)) { + ret = ERROR_RTMP_MESSAGE_DECODE; + srs_error("decode ack window size failed. ret=%d", ret); + return ret; + } + + ackowledgement_window_size = stream->read_4bytes(); + srs_info("decode ack window size success"); + + return ret; +} + int SrsSetWindowAckSizePacket::get_perfer_cid() { return RTMP_CID_ProtocolControl; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 1a08644f8..eca407d90 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -107,6 +107,10 @@ public: */ virtual int send_message(SrsMessage* msg); private: + /** + * when recv message, update the context. + */ + virtual int on_recv_message(SrsMessage* msg); /** * try to recv interlaced message from peer, * return error if error occur and nerver set the pmsg, @@ -372,6 +376,8 @@ public: public: SrsSetWindowAckSizePacket(); virtual ~SrsSetWindowAckSizePacket(); +public: + virtual int decode(SrsStream* stream); public: virtual int get_perfer_cid(); public: