diff --git a/trunk/src/core/srs_core_buffer.cpp b/trunk/src/core/srs_core_buffer.cpp index 809d151f3..fea4c68c5 100755 --- a/trunk/src/core/srs_core_buffer.cpp +++ b/trunk/src/core/srs_core_buffer.cpp @@ -46,6 +46,11 @@ char* SrsBuffer::bytes() return &data.at(0); } +void SrsBuffer::erase(int size) +{ + data.erase(data.begin(), data.begin() + size); +} + void SrsBuffer::append(char* bytes, int size) { std::vector vec(bytes, bytes + size); diff --git a/trunk/src/core/srs_core_buffer.hpp b/trunk/src/core/srs_core_buffer.hpp index 9cea863e9..53585e290 100755 --- a/trunk/src/core/srs_core_buffer.hpp +++ b/trunk/src/core/srs_core_buffer.hpp @@ -47,9 +47,10 @@ public: SrsBuffer(); virtual ~SrsBuffer(); public: - virtual char* bytes(); -private: virtual int size(); + virtual char* bytes(); + virtual void erase(int size); +private: virtual void append(char* bytes, int size); public: virtual int ensure_buffer_bytes(SrsSocket* skt, int required_size); diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index c0e118475..351c8529b 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -51,5 +51,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_CHUNK_START 301 +#define ERROR_RTMP_MSG_INVLIAD_SIZE 302 #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index d03d7f8d5..631bde829 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -98,6 +98,8 @@ SrsProtocol::SrsProtocol(st_netfd_t client_stfd) stfd = client_stfd; buffer = new SrsBuffer(); skt = new SrsSocket(stfd); + + in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE; } SrsProtocol::~SrsProtocol() @@ -130,50 +132,13 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) int ret = ERROR_SUCCESS; while (true) { - // chunk stream basic header. - char fmt = 0; - int cid = 0; - int bh_size = 0; - if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { - srs_error("read basic header failed. ret=%d", ret); - return ret; - } - srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); - - // get the cached chunk stream. - SrsChunkStream* chunk = NULL; - - if (chunk_streams.find(cid) == chunk_streams.end()) { - chunk = chunk_streams[cid] = new SrsChunkStream(cid); - srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); - } else { - chunk = chunk_streams[cid]; - srs_info("cached chunk stream: fmt=%d, cid=%d, message(type=%d, size=%d, time=%d, sid=%d)", - chunk->fmt, chunk->cid, chunk->header.message_type, chunk->header.payload_length, - chunk->header.timestamp, chunk->header.stream_id); - } - - // chunk stream message header - int mh_size = 0; - if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { - srs_error("read message header failed. ret=%d", ret); - return ret; - } - srs_info("read message header success. " - "fmt=%d, mh_size=%d, ext_time=%d, message(type=%d, size=%d, time=%d, sid=%d)", - fmt, mh_size, chunk->extended_timestamp, chunk->header.message_type, - chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); - - // read msg payload from chunk stream. SrsMessage* msg = NULL; - /*int payload_size = 0; - if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { - srs_error("read message payload failed. ret=%d", ret); + + if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv interlaced message failed. ret=%d", ret); return ret; } - srs_info("read message payload success. payload_size=%d", payload_size);*/ - // not got an entire RTMP message, try next chunk. if (!msg) { continue; } @@ -184,12 +149,75 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) return ret; } +int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) +{ + int ret = ERROR_SUCCESS; + + // chunk stream basic header. + char fmt = 0; + int cid = 0; + int bh_size = 0; + if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { + srs_error("read basic header failed. ret=%d", ret); + return ret; + } + srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); + + // get the cached chunk stream. + SrsChunkStream* chunk = NULL; + + if (chunk_streams.find(cid) == chunk_streams.end()) { + chunk = chunk_streams[cid] = new SrsChunkStream(cid); + srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); + } else { + chunk = chunk_streams[cid]; + srs_info("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + } + + // chunk stream message header + int mh_size = 0; + if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { + srs_error("read message header failed. ret=%d", ret); + return ret; + } + srs_info("read message header success. " + "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, + chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); + + // read msg payload from chunk stream. + SrsMessage* msg = NULL; + int payload_size = 0; + if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { + srs_error("read message payload failed. ret=%d", ret); + return ret; + } + + // not got an entire RTMP message, try next chunk. + if (!msg) { + srs_info("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + return ret; + } + + *pmsg = msg; + srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + + return ret; +} + int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) { int ret = ERROR_SUCCESS; - if ((ret = buffer->ensure_buffer_bytes(skt, 1)) != ERROR_SUCCESS) { - srs_error("read 1bytes basic header failed. ret=%d", ret); + int required_size = 1; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); return ret; } @@ -205,8 +233,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) } if (cid == 0) { - if ((ret = buffer->ensure_buffer_bytes(skt, 2)) != ERROR_SUCCESS) { - srs_error("read 2bytes basic header failed. ret=%d", ret); + required_size = 2; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); return ret; } @@ -215,8 +244,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& size) size = 2; srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", size, fmt, cid); } else if (cid == 1) { + required_size = 3; if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { - srs_error("read 3bytes basic header failed. ret=%d", ret); + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); return ret; } @@ -267,24 +297,55 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz mh_size = mh_sizes[(int)fmt]; srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); - if ((ret = buffer->ensure_buffer_bytes(skt, bh_size + mh_size)) != ERROR_SUCCESS) { - srs_error("read %dbytes message header failed. ret=%d", mh_size, ret); + int required_size = bh_size + mh_size; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); return ret; } char* p = buffer->bytes() + bh_size; // parse the message header. // see also: ngx_rtmp_recv - if (fmt <= 2) { - char* pp = (char*)&chunk->header.timestamp; + if (fmt <= RTMP_FMT_TYPE2) { + int32_t timestamp_delta; + char* pp = (char*)×tamp_delta; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; pp[3] = 0; - chunk->extended_timestamp = (chunk->header.timestamp == RTMP_EXTENDED_TIMESTAMP); + if (fmt == RTMP_FMT_TYPE0) { + // 6.1.2.1. Type 0 + // For a type-0 chunk, the absolute timestamp of the message is sent + // here. + chunk->header.timestamp = timestamp_delta; + } else { + // 6.1.2.2. Type 1 + // 6.1.2.3. Type 2 + // For a type-1 or type-2 chunk, the difference between the previous + // chunk's timestamp and the current chunk's timestamp is sent here. + chunk->header.timestamp += timestamp_delta; + } - if (fmt <= 1) { + // fmt: 0 + // timestamp: 3 bytes + // If the timestamp is greater than or equal to 16777215 + // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the + // ‘extended timestamp header’ MUST be present. Otherwise, this value + // SHOULD be the entire timestamp. + // + // fmt: 1 or 2 + // timestamp delta: 3 bytes + // If the delta is greater than or equal to 16777215 (hexadecimal + // 0x00ffffff), this value MUST be 16777215, and the ‘extended + // timestamp header’ MUST be present. Otherwise, this value SHOULD be + // the entire delta. + chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); + if (chunk->extended_timestamp) { + chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP; + } + + if (fmt <= RTMP_FMT_TYPE1) { pp = (char*)&chunk->header.payload_length; pp[2] = *p++; pp[1] = *p++; @@ -318,9 +379,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz if (chunk->extended_timestamp) { mh_size += 4; + required_size = bh_size + mh_size; srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); - if ((ret = buffer->ensure_buffer_bytes(skt, bh_size + mh_size)) != ERROR_SUCCESS) { - srs_error("read %dbytes message header failed. ret=%d", mh_size, ret); + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); return ret; } @@ -332,6 +394,90 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp); } + // valid message + if (chunk->header.payload_length < 0) { + ret = ERROR_RTMP_MSG_INVLIAD_SIZE; + srs_error("RTMP message size must not be negative. size=%d, ret=%d", + chunk->header.payload_length, ret); + return ret; + } + + // copy header to msg + chunk->msg->header = chunk->header; + + return ret; +} + +int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg) +{ + int ret = ERROR_SUCCESS; + + // empty message + if (chunk->header.payload_length == 0) { + // need erase the header in buffer. + buffer->erase(bh_size + mh_size); + + srs_warn("get an empty RTMP " + "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type, + chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); + + return ret; + } + srs_assert(chunk->header.payload_length > 0); + + // the chunk payload size. + payload_size = chunk->header.payload_length - chunk->msg->size; + if (payload_size > in_chunk_size) { + payload_size = in_chunk_size; + } + srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", + payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); + + // create msg payload if not initialized + if (!chunk->msg->payload) { + chunk->msg->payload = new int8_t[chunk->header.payload_length]; + memset(chunk->msg->payload, 0, chunk->header.payload_length); + srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); + } + + // copy payload from buffer. + int copy_size = buffer->size() - bh_size - mh_size; + if (copy_size > payload_size) { + copy_size = payload_size; + } + memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, copy_size); + buffer->erase(bh_size + mh_size + copy_size); + chunk->msg->size += copy_size; + + // when empty, read the left bytes from socket. + int left_size = payload_size - copy_size; + if (left_size > 0) { + ssize_t nread; + if ((ret = skt->read_fully(chunk->msg->payload + chunk->msg->size, left_size, &nread)) != ERROR_SUCCESS) { + srs_error("read chunk payload from socket error. " + "payload_size=%d, copy_size=%d, left_size=%d, size=%d, msg_size=%d, ret=%d", + payload_size, copy_size, left_size, chunk->msg->size, chunk->header.payload_length, ret); + return ret; + } + } + + srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); + + // got entire RTMP message? + if (chunk->header.payload_length == chunk->msg->size) { + *pmsg = chunk->msg; + chunk->msg = NULL; + srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)", + chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id); + return ret; + } + + srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d", + chunk->header.message_type, chunk->header.payload_length, + chunk->header.timestamp, chunk->header.stream_id, + chunk->msg->size); + return ret; } @@ -365,6 +511,7 @@ SrsChunkStream::~SrsChunkStream() SrsMessage::SrsMessage() { + size = 0; payload = NULL; } diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index d89537b7a..c949bd553 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -51,14 +51,18 @@ private: st_netfd_t stfd; SrsBuffer* buffer; SrsSocket* skt; + int32_t in_chunk_size; + int32_t out_chunk_size; public: SrsProtocol(st_netfd_t client_stfd); virtual ~SrsProtocol(); public: virtual int recv_message(SrsMessage** pmsg); private: + virtual int recv_interlaced_message(SrsMessage** pmsg); virtual int read_basic_header(char& fmt, int& cid, int& size); virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); + virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); }; /** @@ -142,6 +146,7 @@ public: * or compressed video data. The payload format and interpretation are * beyond the scope of this document. */ + int32_t size; int8_t* payload; public: SrsMessage();