diff --git a/README.md b/README.md index 2f5deb507..80aba9748 100755 --- a/README.md +++ b/README.md @@ -719,6 +719,7 @@ The play HTTP FLV benchmark by [SB](https://github.com/simple-rtmp-server/srs-be | 2014-05-24 | 2.0.168 | 2.3k(2300) | players | 92% | 276MB | [code][p17] | | 2014-05-24 | 2.0.169 | 3.0k(3000) | players | 94% | 188MB | [code][p18] | | 2014-05-24 | 2.0.170 | 3.0k(3000) | players | 89% | 96MB | [code][p19] | +| 2014-05-25 | 2.0.171 | 6.0k(6000) | players | 84% | 297MB | [code][p20] | ### Latency benchmark diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index d334b86d8..e7e259456 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -175,7 +175,7 @@ int SrsHttpResponseWriter::writev(iovec* iov, int iovcnt, ssize_t* pnwrite) } // send in chunked encoding. - int nb_iovss = iovcnt * 4; + int nb_iovss = 3 + iovcnt; iovec* iovss = iovss_cache; if (nb_iovss_cache < nb_iovss) { srs_freep(iovss_cache); @@ -183,29 +183,44 @@ int SrsHttpResponseWriter::writev(iovec* iov, int iovcnt, ssize_t* pnwrite) iovss = iovss_cache = new iovec[nb_iovss]; } - char* pheader_cache = header_cache; + // send in chunked encoding. + + // chunk size. + int size = 0; for (int i = 0; i < iovcnt; i++) { - int left = SRS_HTTP_HEADER_CACHE_SIZE - (int)(pheader_cache - header_cache); - srs_assert(left > 0); - iovec* data_iov = iov + i; - int nb_size = snprintf(pheader_cache, left, "%x", (int)data_iov->iov_len); - - iovec* iovs = iovss + (i * 4); - iovs[0].iov_base = (char*)pheader_cache; - iovs[0].iov_len = (int)nb_size; - iovs[1].iov_base = (char*)SRS_HTTP_CRLF; - iovs[1].iov_len = 2; - iovs[2].iov_base = (char*)data_iov->iov_base; - iovs[2].iov_len = (int)data_iov->iov_len; - iovs[3].iov_base = (char*)SRS_HTTP_CRLF; - iovs[3].iov_len = 2; - - pheader_cache += nb_size; + size += data_iov->iov_len; + } + written += size; + + // chunk header + int nb_size = snprintf(header_cache, SRS_HTTP_HEADER_CACHE_SIZE, "%x", size); + iovec* iovs = iovss; + iovs[0].iov_base = (char*)header_cache; + iovs[0].iov_len = (int)nb_size; + iovs++; + + // chunk header eof. + iovs[0].iov_base = (char*)SRS_HTTP_CRLF; + iovs[0].iov_len = 2; + iovs++; + + // chunk body. + for (int i = 0; i < iovcnt; i++) { + iovec* data_iov = iov + i; + iovs[0].iov_base = (char*)data_iov->iov_base; + iovs[0].iov_len = (int)data_iov->iov_len; + iovs++; } + // chunk body eof. + iovs[0].iov_base = (char*)SRS_HTTP_CRLF; + iovs[0].iov_len = 2; + iovs++; + + // sendout all ioves. ssize_t nwrite; - if ((ret = skt->writev(iovss, nb_iovss, &nwrite)) != ERROR_SUCCESS) { + if ((ret = srs_write_large_iovs(skt, iovss, nb_iovss, &nwrite)) != ERROR_SUCCESS) { return ret; } @@ -1442,6 +1457,21 @@ int SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/) return ERROR_SUCCESS; } +#ifdef SRS_PERF_FAST_FLV_ENCODER +SrsFastFlvStreamEncoder::SrsFastFlvStreamEncoder() +{ +} + +SrsFastFlvStreamEncoder::~SrsFastFlvStreamEncoder() +{ +} + +int SrsFastFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int count) +{ + return enc->write_tags(msgs, count); +} +#endif + SrsAacStreamEncoder::SrsAacStreamEncoder() { enc = new SrsAacEncoder(); @@ -1612,7 +1642,11 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) srs_assert(entry); if (srs_string_ends_with(entry->pattern, ".flv")) { w->header()->set_content_type("video/x-flv"); +#ifdef SRS_PERF_FAST_FLV_ENCODER + enc = new SrsFastFlvStreamEncoder(); +#else enc = new SrsFlvStreamEncoder(); +#endif } else if (srs_string_ends_with(entry->pattern, ".aac")) { w->header()->set_content_type("audio/x-aac"); enc = new SrsAacStreamEncoder(); @@ -1658,6 +1692,10 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) } } +#ifdef SRS_PERF_FAST_FLV_ENCODER + SrsFastFlvStreamEncoder* ffe = dynamic_cast(enc); +#endif + while (true) { pprint->elapse(); @@ -1684,7 +1722,15 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) } // sendout all messages. +#ifdef SRS_PERF_FAST_FLV_ENCODER + if (ffe) { + ret = ffe->write_tags(msgs.msgs, count); + } else { + ret = streaming_send_messages(enc, msgs.msgs, count); + } +#else ret = streaming_send_messages(enc, msgs.msgs, count); +#endif // free the messages. for (int i = 0; i < count; i++) { diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 0a093afa4..f1d55d06d 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -71,11 +71,9 @@ class SrsHttpMessage; #ifdef SRS_AUTO_HTTP_PARSER -// for HTTP FLV, each video/audio packet is send by 3 iovs, -// while each iov is send by 4 sub iovs, that is needs 3 chunk header, -// suppose each header is 16 length, 3*16=48 is ok. -// that is, 512 can used for 16 iovs to send. -#define SRS_HTTP_HEADER_CACHE_SIZE 512 +// the http chunked header size, +// for writev, there always one chunk to send it. +#define SRS_HTTP_HEADER_CACHE_SIZE 64 /** * response writer use st socket @@ -214,8 +212,8 @@ public: * set the original messages, then update the message. */ virtual int update(std::string url, http_parser* header, - SrsFastBuffer* body, std::vector& headers - ); + SrsFastBuffer* body, std::vector& headers + ); private: virtual SrsConnection* connection(); public: @@ -454,7 +452,7 @@ public: */ class SrsFlvStreamEncoder : public ISrsStreamEncoder { -private: +protected: SrsFlvEncoder* enc; public: SrsFlvStreamEncoder(); @@ -469,6 +467,24 @@ public: virtual int dump_cache(SrsConsumer* consumer); }; +#ifdef SRS_PERF_FAST_FLV_ENCODER +/** + * the fast flv stream encoder. + * @see https://github.com/simple-rtmp-server/srs/issues/405 + */ +class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder +{ +public: + SrsFastFlvStreamEncoder(); + virtual ~SrsFastFlvStreamEncoder(); +public: + /** + * write the tags in a time. + */ + virtual int write_tags(SrsSharedPtrMessage** msgs, int count); +}; +#endif + /** * the ts stream encoder, remux rtmp stream to ts stream. */ diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 572399be6..aaf72ac9c 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 170 +#define VERSION_REVISION 171 // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index b6aff9f0a..407ca0fef 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -181,5 +181,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #undef SRS_PERF_SO_SNDBUF_SIZE #endif +/** + * define the following macro to enable the fast flv encoder. + * @see https://github.com/simple-rtmp-server/srs/issues/405 + */ +#undef SRS_PERF_FAST_FLV_ENCODER +#define SRS_PERF_FAST_FLV_ENCODER + #endif diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index b0b4bd04f..745b5b083 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -37,16 +37,304 @@ using namespace std; #include #include #include +#include + +SrsMessageHeader::SrsMessageHeader() +{ + message_type = 0; + payload_length = 0; + timestamp_delta = 0; + stream_id = 0; + + timestamp = 0; + // we always use the connection chunk-id + perfer_cid = RTMP_CID_OverConnection; +} + +SrsMessageHeader::~SrsMessageHeader() +{ +} + +bool SrsMessageHeader::is_audio() +{ + return message_type == RTMP_MSG_AudioMessage; +} + +bool SrsMessageHeader::is_video() +{ + return message_type == RTMP_MSG_VideoMessage; +} + +bool SrsMessageHeader::is_amf0_command() +{ + return message_type == RTMP_MSG_AMF0CommandMessage; +} + +bool SrsMessageHeader::is_amf0_data() +{ + return message_type == RTMP_MSG_AMF0DataMessage; +} + +bool SrsMessageHeader::is_amf3_command() +{ + return message_type == RTMP_MSG_AMF3CommandMessage; +} + +bool SrsMessageHeader::is_amf3_data() +{ + return message_type == RTMP_MSG_AMF3DataMessage; +} + +bool SrsMessageHeader::is_window_ackledgement_size() +{ + return message_type == RTMP_MSG_WindowAcknowledgementSize; +} + +bool SrsMessageHeader::is_ackledgement() +{ + return message_type == RTMP_MSG_Acknowledgement; +} + +bool SrsMessageHeader::is_set_chunk_size() +{ + return message_type == RTMP_MSG_SetChunkSize; +} + +bool SrsMessageHeader::is_user_control_message() +{ + return message_type == RTMP_MSG_UserControlMessage; +} + +bool SrsMessageHeader::is_set_peer_bandwidth() +{ + return message_type == RTMP_MSG_SetPeerBandwidth; +} + +bool SrsMessageHeader::is_aggregate() +{ + return message_type == RTMP_MSG_AggregateMessage; +} + +void SrsMessageHeader::initialize_amf0_script(int size, int stream) +{ + message_type = RTMP_MSG_AMF0DataMessage; + payload_length = (int32_t)size; + timestamp_delta = (int32_t)0; + timestamp = (int64_t)0; + stream_id = (int32_t)stream; + + // amf0 script use connection2 chunk-id + perfer_cid = RTMP_CID_OverConnection2; +} + +void SrsMessageHeader::initialize_audio(int size, u_int32_t time, int stream) +{ + message_type = RTMP_MSG_AudioMessage; + payload_length = (int32_t)size; + timestamp_delta = (int32_t)time; + timestamp = (int64_t)time; + stream_id = (int32_t)stream; + + // audio chunk-id + perfer_cid = RTMP_CID_Audio; +} + +void SrsMessageHeader::initialize_video(int size, u_int32_t time, int stream) +{ + message_type = RTMP_MSG_VideoMessage; + payload_length = (int32_t)size; + timestamp_delta = (int32_t)time; + timestamp = (int64_t)time; + stream_id = (int32_t)stream; + + // video chunk-id + perfer_cid = RTMP_CID_Video; +} + +SrsCommonMessage::SrsCommonMessage() +{ + payload = NULL; + size = 0; +} + +SrsCommonMessage::~SrsCommonMessage() +{ + srs_freep(payload); +} + +SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload() +{ + payload = NULL; + size = 0; + shared_count = 0; +} + +SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() +{ + srs_freep(payload); +} + +SrsSharedPtrMessage::SrsSharedPtrMessage() +{ + ptr = NULL; +} + +SrsSharedPtrMessage::~SrsSharedPtrMessage() +{ + if (ptr) { + if (ptr->shared_count == 0) { + srs_freep(ptr); + } else { + ptr->shared_count--; + } + } +} + +int SrsSharedPtrMessage::create(SrsCommonMessage* msg) +{ + int ret = ERROR_SUCCESS; + + if ((ret = create(&msg->header, msg->payload, msg->size)) != ERROR_SUCCESS) { + return ret; + } + + // to prevent double free of payload: + // initialize already attach the payload of msg, + // detach the payload to transfer the owner to shared ptr. + msg->payload = NULL; + msg->size = 0; + + return ret; +} + +int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size) +{ + int ret = ERROR_SUCCESS; + + if (ptr) { + ret = ERROR_SYSTEM_ASSERT_FAILED; + srs_error("should not set the payload twice. ret=%d", ret); + srs_assert(false); + + return ret; + } + + ptr = new SrsSharedPtrPayload(); + + // direct attach the data. + if (pheader) { + ptr->header.message_type = pheader->message_type; + ptr->header.payload_length = size; + ptr->header.perfer_cid = pheader->perfer_cid; + this->timestamp = pheader->timestamp; + this->stream_id = pheader->stream_id; + } + ptr->payload = payload; + ptr->size = size; + + // message can access it. + this->payload = ptr->payload; + this->size = ptr->size; + + return ret; +} + +int SrsSharedPtrMessage::count() +{ + srs_assert(ptr); + return ptr->shared_count; +} + +bool SrsSharedPtrMessage::check(int stream_id) +{ + // we donot use the complex basic header, + // ensure the basic header is 1bytes. + if (ptr->header.perfer_cid < 2) { + srs_info("change the chunk_id=%d to default=%d", + ptr->header.perfer_cid, RTMP_CID_ProtocolControl); + ptr->header.perfer_cid = RTMP_CID_ProtocolControl; + } + + // we assume that the stream_id in a group must be the same. + if (this->stream_id == stream_id) { + return true; + } + this->stream_id = stream_id; + + return false; +} + +bool SrsSharedPtrMessage::is_av() +{ + return ptr->header.message_type == RTMP_MSG_AudioMessage + || ptr->header.message_type == RTMP_MSG_VideoMessage; +} + +bool SrsSharedPtrMessage::is_audio() +{ + return ptr->header.message_type == RTMP_MSG_AudioMessage; +} + +bool SrsSharedPtrMessage::is_video() +{ + return ptr->header.message_type == RTMP_MSG_VideoMessage; +} + +int SrsSharedPtrMessage::chunk_header(char* cache, int nb_cache, bool c0) +{ + if (c0) { + return srs_chunk_header_c0( + ptr->header.perfer_cid, timestamp, ptr->header.payload_length, + ptr->header.message_type, stream_id, + cache, nb_cache); + } else { + return srs_chunk_header_c3( + ptr->header.perfer_cid, timestamp, + cache, nb_cache); + } +} + +SrsSharedPtrMessage* SrsSharedPtrMessage::copy() +{ + srs_assert(ptr); + + SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); + + copy->ptr = ptr; + ptr->shared_count++; + + copy->timestamp = timestamp; + copy->stream_id = stream_id; + copy->payload = ptr->payload; + copy->size = ptr->size; + + return copy; +} SrsFlvEncoder::SrsFlvEncoder() { _fs = NULL; tag_stream = new SrsStream(); + +#ifdef SRS_PERF_FAST_FLV_ENCODER + nb_tag_headers = 0; + tag_headers = NULL; + nb_iovss_cache = 0; + iovss_cache = NULL; + nb_ppts = 0; + ppts = NULL; +#endif } SrsFlvEncoder::~SrsFlvEncoder() { srs_freep(tag_stream); + +#ifdef SRS_PERF_FAST_FLV_ENCODER + srs_freep(tag_headers); + srs_freep(iovss_cache); + srs_freep(ppts); +#endif } int SrsFlvEncoder::initialize(SrsFileWriter* fs) @@ -115,23 +403,9 @@ int SrsFlvEncoder::write_metadata(char type, char* data, int size) srs_assert(data); - // 11 bytes tag header - /*char tag_header[] = { - (char)type, // TagType UB [5], 18 = script data - (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. - (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. - (char)0x00, // TimestampExtended UI8 - (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. - };*/ - - // write data size. - if ((ret = tag_stream->initialize(tag_header, 8)) != ERROR_SUCCESS) { + if ((ret = write_metadata_to_cache(type, data, size, tag_header)) != ERROR_SUCCESS) { return ret; } - tag_stream->write_1bytes(type); - tag_stream->write_3bytes(size); - tag_stream->write_3bytes(0x00); - tag_stream->write_1bytes(0x00); if ((ret = write_tag(tag_header, sizeof(tag_header), data, size)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { @@ -149,26 +423,9 @@ int SrsFlvEncoder::write_audio(int64_t timestamp, char* data, int size) srs_assert(data); - timestamp &= 0x7fffffff; - - // 11bytes tag header - /*char tag_header[] = { - (char)SrsCodecFlvTagAudio, // TagType UB [5], 8 = audio - (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. - (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. - (char)0x00, // TimestampExtended UI8 - (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. - };*/ - - // write data size. - if ((ret = tag_stream->initialize(tag_header, 8)) != ERROR_SUCCESS) { + if ((ret = write_audio_to_cache(timestamp, data, size, tag_header)) != ERROR_SUCCESS) { return ret; } - tag_stream->write_1bytes(SrsCodecFlvTagAudio); - tag_stream->write_3bytes(size); - tag_stream->write_3bytes((int32_t)timestamp); - // default to little-endian - tag_stream->write_1bytes((timestamp >> 24) & 0xFF); if ((ret = write_tag(tag_header, sizeof(tag_header), data, size)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { @@ -186,26 +443,9 @@ int SrsFlvEncoder::write_video(int64_t timestamp, char* data, int size) srs_assert(data); - timestamp &= 0x7fffffff; - - // 11bytes tag header - /*char tag_header[] = { - (char)SrsCodecFlvTagVideo, // TagType UB [5], 9 = video - (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. - (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. - (char)0x00, // TimestampExtended UI8 - (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. - };*/ - - // write data size. - if ((ret = tag_stream->initialize(tag_header, 8)) != ERROR_SUCCESS) { + if ((ret = write_video_to_cache(timestamp, data, size, tag_header)) != ERROR_SUCCESS) { return ret; } - tag_stream->write_1bytes(SrsCodecFlvTagVideo); - tag_stream->write_3bytes(size); - tag_stream->write_3bytes((int32_t)timestamp); - // default to little-endian - tag_stream->write_1bytes((timestamp >> 24) & 0xFF); if ((ret = write_tag(tag_header, sizeof(tag_header), data, size)) != ERROR_SUCCESS) { srs_error("write flv video tag failed. ret=%d", ret); @@ -221,16 +461,200 @@ int SrsFlvEncoder::size_tag(int data_size) return SRS_FLV_TAG_HEADER_SIZE + data_size + SRS_FLV_PREVIOUS_TAG_SIZE; } +#ifdef SRS_PERF_FAST_FLV_ENCODER +int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count) +{ + int ret = ERROR_SUCCESS; + + // realloc the iovss. + int nb_iovss = 3 * count; + iovec* iovss = iovss_cache; + if (nb_iovss_cache < nb_iovss) { + srs_freep(iovss_cache); + + nb_iovss_cache = nb_iovss; + iovss = iovss_cache = new iovec[nb_iovss]; + } + + // realloc the tag headers. + char* cache = tag_headers; + if (nb_tag_headers < count) { + srs_freep(tag_headers); + + nb_tag_headers = count; + cache = tag_headers = new char[SRS_FLV_TAG_HEADER_SIZE * count]; + } + + // realloc the pts. + char* pts = ppts; + if (nb_ppts < count) { + srs_freep(ppts); + + nb_ppts = count; + pts = ppts = new char[SRS_FLV_PREVIOUS_TAG_SIZE * count]; + } + + // the cache is ok, write each messages. + iovec* iovs = iovss; + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + + // cache all flv header. + if (msg->is_audio()) { + if ((ret = write_audio_to_cache(msg->timestamp, msg->payload, msg->size, cache)) != ERROR_SUCCESS) { + return ret; + } + } else if (msg->is_video()) { + if ((ret = write_video_to_cache(msg->timestamp, msg->payload, msg->size, cache)) != ERROR_SUCCESS) { + return ret; + } + } else { + if ((ret = write_metadata_to_cache(SrsCodecFlvTagScript, msg->payload, msg->size, cache)) != ERROR_SUCCESS) { + return ret; + } + } + + // cache all pts. + if ((ret = write_pts_to_cache(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts)) != ERROR_SUCCESS) { + return ret; + } + + // all ioves. + iovs[0].iov_base = cache; + iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE; + iovs[1].iov_base = msg->payload; + iovs[1].iov_len = msg->size; + iovs[2].iov_base = pts; + iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE; + + // move next. + cache += SRS_FLV_TAG_HEADER_SIZE; + pts += SRS_FLV_PREVIOUS_TAG_SIZE; + iovs += 3; + } + + if ((ret = _fs->writev(iovss, nb_iovss, NULL)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("write flv tags failed. ret=%d", ret); + } + return ret; + } + + return ret; +} +#endif + +int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char* cache) +{ + int ret = ERROR_SUCCESS; + + srs_assert(data); + + // 11 bytes tag header + /*char tag_header[] = { + (char)type, // TagType UB [5], 18 = script data + (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. + (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. + (char)0x00, // TimestampExtended UI8 + (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. + };*/ + + // write data size. + if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) { + return ret; + } + tag_stream->write_1bytes(type); + tag_stream->write_3bytes(size); + tag_stream->write_3bytes(0x00); + tag_stream->write_1bytes(0x00); + tag_stream->write_3bytes(0x00); + + return ret; +} + +int SrsFlvEncoder::write_audio_to_cache(int64_t timestamp, char* data, int size, char* cache) +{ + int ret = ERROR_SUCCESS; + + srs_assert(data); + + timestamp &= 0x7fffffff; + + // 11bytes tag header + /*char tag_header[] = { + (char)SrsCodecFlvTagAudio, // TagType UB [5], 8 = audio + (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. + (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. + (char)0x00, // TimestampExtended UI8 + (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. + };*/ + + // write data size. + if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) { + return ret; + } + tag_stream->write_1bytes(SrsCodecFlvTagAudio); + tag_stream->write_3bytes(size); + tag_stream->write_3bytes((int32_t)timestamp); + // default to little-endian + tag_stream->write_1bytes((timestamp >> 24) & 0xFF); + tag_stream->write_3bytes(0x00); + + return ret; +} + +int SrsFlvEncoder::write_video_to_cache(int64_t timestamp, char* data, int size, char* cache) +{ + int ret = ERROR_SUCCESS; + + srs_assert(data); + + timestamp &= 0x7fffffff; + + // 11bytes tag header + /*char tag_header[] = { + (char)SrsCodecFlvTagVideo, // TagType UB [5], 9 = video + (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message. + (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies. + (char)0x00, // TimestampExtended UI8 + (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0. + };*/ + + // write data size. + if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) { + return ret; + } + tag_stream->write_1bytes(SrsCodecFlvTagVideo); + tag_stream->write_3bytes(size); + tag_stream->write_3bytes((int32_t)timestamp); + // default to little-endian + tag_stream->write_1bytes((timestamp >> 24) & 0xFF); + tag_stream->write_3bytes(0x00); + + return ret; +} + +int SrsFlvEncoder::write_pts_to_cache(int size, char* cache) +{ + int ret = ERROR_SUCCESS; + + if ((ret = tag_stream->initialize(cache, SRS_FLV_PREVIOUS_TAG_SIZE)) != ERROR_SUCCESS) { + return ret; + } + tag_stream->write_4bytes(size); + + return ret; +} + int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_size) { int ret = ERROR_SUCCESS; // PreviousTagSizeN UI32 Size of last tag, including its header, in bytes. char pre_size[SRS_FLV_PREVIOUS_TAG_SIZE]; - if ((ret = tag_stream->initialize(pre_size, SRS_FLV_PREVIOUS_TAG_SIZE)) != ERROR_SUCCESS) { + if ((ret = write_pts_to_cache(tag_size + header_size, pre_size)) != ERROR_SUCCESS) { return ret; } - tag_stream->write_4bytes(tag_size + header_size); iovec iovs[3]; iovs[0].iov_base = header; @@ -238,7 +662,7 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s iovs[1].iov_base = tag; iovs[1].iov_len = tag_size; iovs[2].iov_base = pre_size; - iovs[2].iov_len = sizeof(SRS_FLV_PREVIOUS_TAG_SIZE); + iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE; if ((ret = _fs->writev(iovs, 3, NULL)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 848d247c7..a33e1a320 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -31,6 +31,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +// for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 +#ifndef _WIN32 +#include +#endif + class SrsStream; class SrsFileWriter; class SrsFileReader; @@ -38,6 +43,387 @@ class SrsFileReader; #define SRS_FLV_TAG_HEADER_SIZE 11 #define SRS_FLV_PREVIOUS_TAG_SIZE 4 +/**************************************************************************** + ***************************************************************************** + ****************************************************************************/ +/** + 5. Protocol Control Messages + RTMP reserves message type IDs 1-7 for protocol control messages. + These messages contain information needed by the RTM Chunk Stream + protocol or RTMP itself. Protocol messages with IDs 1 & 2 are + reserved for usage with RTM Chunk Stream protocol. Protocol messages + with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID + 7 is used between edge server and origin server. + */ +#define RTMP_MSG_SetChunkSize 0x01 +#define RTMP_MSG_AbortMessage 0x02 +#define RTMP_MSG_Acknowledgement 0x03 +#define RTMP_MSG_UserControlMessage 0x04 +#define RTMP_MSG_WindowAcknowledgementSize 0x05 +#define RTMP_MSG_SetPeerBandwidth 0x06 +#define RTMP_MSG_EdgeAndOriginServerCommand 0x07 +/** + 3. Types of messages + The server and the client send messages over the network to + communicate with each other. The messages can be of any type which + includes audio messages, video messages, command messages, shared + object messages, data messages, and user control messages. + 3.1. Command message + Command messages carry the AMF-encoded commands between the client + and the server. These messages have been assigned message type value + of 20 for AMF0 encoding and message type value of 17 for AMF3 + encoding. These messages are sent to perform some operations like + connect, createStream, publish, play, pause on the peer. Command + messages like onstatus, result etc. are used to inform the sender + about the status of the requested commands. A command message + consists of command name, transaction ID, and command object that + contains related parameters. A client or a server can request Remote + Procedure Calls (RPC) over streams that are communicated using the + command messages to the peer. + */ +#define RTMP_MSG_AMF3CommandMessage 17 // 0x11 +#define RTMP_MSG_AMF0CommandMessage 20 // 0x14 +/** + 3.2. Data message + The client or the server sends this message to send Metadata or any + user data to the peer. Metadata includes details about the + data(audio, video etc.) like creation time, duration, theme and so + on. These messages have been assigned message type value of 18 for + AMF0 and message type value of 15 for AMF3. + */ +#define RTMP_MSG_AMF0DataMessage 18 // 0x12 +#define RTMP_MSG_AMF3DataMessage 15 // 0x0F +/** + 3.3. Shared object message + A shared object is a Flash object (a collection of name value pairs) + that are in synchronization across multiple clients, instances, and + so on. The message types kMsgContainer=19 for AMF0 and + kMsgContainerEx=16 for AMF3 are reserved for shared object events. + Each message can contain multiple events. + */ +#define RTMP_MSG_AMF3SharedObject 16 // 0x10 +#define RTMP_MSG_AMF0SharedObject 19 // 0x13 +/** + 3.4. Audio message + The client or the server sends this message to send audio data to the + peer. The message type value of 8 is reserved for audio messages. + */ +#define RTMP_MSG_AudioMessage 8 // 0x08 +/* * + 3.5. Video message + The client or the server sends this message to send video data to the + peer. The message type value of 9 is reserved for video messages. + These messages are large and can delay the sending of other type of + messages. To avoid such a situation, the video message is assigned + the lowest priority. + */ +#define RTMP_MSG_VideoMessage 9 // 0x09 +/** + 3.6. Aggregate message + An aggregate message is a single message that contains a list of submessages. + The message type value of 22 is reserved for aggregate + messages. + */ +#define RTMP_MSG_AggregateMessage 22 // 0x16 + +/**************************************************************************** + ***************************************************************************** + ****************************************************************************/ +/** + * the chunk stream id used for some under-layer message, + * for example, the PC(protocol control) message. + */ +#define RTMP_CID_ProtocolControl 0x02 +/** + * the AMF0/AMF3 command message, invoke method and return the result, over NetConnection. + * generally use 0x03. + */ +#define RTMP_CID_OverConnection 0x03 +/** + * the AMF0/AMF3 command message, invoke method and return the result, over NetConnection, + * the midst state(we guess). + * rarely used, e.g. onStatus(NetStream.Play.Reset). + */ +#define RTMP_CID_OverConnection2 0x04 +/** + * the stream message(amf0/amf3), over NetStream. + * generally use 0x05. + */ +#define RTMP_CID_OverStream 0x05 +/** + * the stream message(amf0/amf3), over NetStream, the midst state(we guess). + * rarely used, e.g. play("mp4:mystram.f4v") + */ +#define RTMP_CID_OverStream2 0x08 +/** + * the stream message(video), over NetStream + * generally use 0x06. + */ +#define RTMP_CID_Video 0x06 +/** + * the stream message(audio), over NetStream. + * generally use 0x07. + */ +#define RTMP_CID_Audio 0x07 + +/** + * 6.1. Chunk Format + * Extended timestamp: 0 or 4 bytes + * This field MUST be sent when the normal timsestamp is set to + * 0xffffff, it MUST NOT be sent if the normal timestamp is set to + * anything else. So for values less than 0xffffff the normal + * timestamp field SHOULD be used in which case the extended timestamp + * MUST NOT be present. For values greater than or equal to 0xffffff + * the normal timestamp field MUST NOT be used and MUST be set to + * 0xffffff and the extended timestamp MUST be sent. + */ +#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF + +/** + * 4.1. Message Header + */ +class SrsMessageHeader +{ +public: + /** + * 3bytes. + * Three-byte field that contains a timestamp delta of the message. + * @remark, only used for decoding message from chunk stream. + */ + int32_t timestamp_delta; + /** + * 3bytes. + * Three-byte field that represents the size of the payload in bytes. + * It is set in big-endian format. + */ + int32_t payload_length; + /** + * 1byte. + * One byte field to represent the message type. A range of type IDs + * (1-7) are reserved for protocol control messages. + */ + int8_t message_type; + /** + * 4bytes. + * Four-byte field that identifies the stream of the message. These + * bytes are set in little-endian format. + */ + int32_t stream_id; + + /** + * Four-byte field that contains a timestamp of the message. + * The 4 bytes are packed in the big-endian order. + * @remark, used as calc timestamp when decode and encode time. + * @remark, we use 64bits for large time for jitter detect and hls. + */ + int64_t timestamp; +public: + /** + * get the perfered cid(chunk stream id) which sendout over. + * set at decoding, and canbe used for directly send message, + * for example, dispatch to all connections. + */ + int perfer_cid; +public: + SrsMessageHeader(); + virtual ~SrsMessageHeader(); +public: + bool is_audio(); + bool is_video(); + bool is_amf0_command(); + bool is_amf0_data(); + bool is_amf3_command(); + bool is_amf3_data(); + bool is_window_ackledgement_size(); + bool is_ackledgement(); + bool is_set_chunk_size(); + bool is_user_control_message(); + bool is_set_peer_bandwidth(); + bool is_aggregate(); +public: + /** + * create a amf0 script header, set the size and stream_id. + */ + void initialize_amf0_script(int size, int stream); + /** + * create a audio header, set the size, timestamp and stream_id. + */ + void initialize_audio(int size, u_int32_t time, int stream); + /** + * create a video header, set the size, timestamp and stream_id. + */ + void initialize_video(int size, u_int32_t time, int stream); +}; + +/** + * message is raw data RTMP message, bytes oriented, + * protcol always recv RTMP message, and can send RTMP message or RTMP packet. + * the common message is read from underlay protocol sdk. + * while the shared ptr message used to copy and send. + */ +class SrsCommonMessage +{ + // 4.1. Message Header +public: + SrsMessageHeader header; + // 4.2. Message Payload +public: + /** + * current message parsed size, + * size <= header.payload_length + * for the payload maybe sent in multiple chunks. + */ + int size; + /** + * the payload of message, the SrsCommonMessage never know about the detail of payload, + * user must use SrsProtocol.decode_message to get concrete packet. + * @remark, not all message payload can be decoded to packet. for example, + * video/audio packet use raw bytes, no video/audio packet. + */ + char* payload; +public: + SrsCommonMessage(); +public: + virtual ~SrsCommonMessage(); +}; + +/** + * the message header for shared ptr message. + * only the message for all msgs are same. + */ +struct SrsSharedMessageHeader +{ + /** + * 3bytes. + * Three-byte field that represents the size of the payload in bytes. + * It is set in big-endian format. + */ + int32_t payload_length; + /** + * 1byte. + * One byte field to represent the message type. A range of type IDs + * (1-7) are reserved for protocol control messages. + */ + int8_t message_type; + /** + * get the perfered cid(chunk stream id) which sendout over. + * set at decoding, and canbe used for directly send message, + * for example, dispatch to all connections. + */ + int perfer_cid; +}; + +/** + * shared ptr message. + * for audio/video/data message that need less memory copy. + * and only for output. + * + * create first object by constructor and create(), + * use copy if need reference count message. + * + */ +class SrsSharedPtrMessage +{ + // 4.1. Message Header +public: + // the header can shared, only set the timestamp and stream id. + // @see https://github.com/simple-rtmp-server/srs/issues/251 + //SrsSharedMessageHeader header; + /** + * Four-byte field that contains a timestamp of the message. + * The 4 bytes are packed in the big-endian order. + * @remark, used as calc timestamp when decode and encode time. + * @remark, we use 64bits for large time for jitter detect and hls. + */ + int64_t timestamp; + /** + * 4bytes. + * Four-byte field that identifies the stream of the message. These + * bytes are set in big-endian format. + */ + int32_t stream_id; + // 4.2. Message Payload +public: + /** + * current message parsed size, + * size <= header.payload_length + * for the payload maybe sent in multiple chunks. + */ + int size; + /** + * the payload of message, the SrsCommonMessage never know about the detail of payload, + * user must use SrsProtocol.decode_message to get concrete packet. + * @remark, not all message payload can be decoded to packet. for example, + * video/audio packet use raw bytes, no video/audio packet. + */ + char* payload; +private: + class SrsSharedPtrPayload + { + public: + // shared message header. + // @see https://github.com/simple-rtmp-server/srs/issues/251 + SrsSharedMessageHeader header; + // actual shared payload. + char* payload; + // size of payload. + int size; + // the reference count + int shared_count; + public: + SrsSharedPtrPayload(); + virtual ~SrsSharedPtrPayload(); + }; + SrsSharedPtrPayload* ptr; +public: + SrsSharedPtrMessage(); + virtual ~SrsSharedPtrMessage(); +public: + /** + * create shared ptr message, + * copy header, manage the payload of msg, + * set the payload to NULL to prevent double free. + * @remark payload of msg set to NULL if success. + */ + virtual int create(SrsCommonMessage* msg); + /** + * create shared ptr message, + * from the header and payload. + * @remark user should never free the payload. + * @param pheader, the header to copy to the message. NULL to ignore. + */ + virtual int create(SrsMessageHeader* pheader, char* payload, int size); + /** + * get current reference count. + * when this object created, count set to 0. + * if copy() this object, count increase 1. + * if this or copy deleted, free payload when count is 0, or count--. + * @remark, assert object is created. + */ + virtual int count(); + /** + * check perfer cid and stream id. + * @return whether stream id already set. + */ + virtual bool check(int stream_id); +public: + virtual bool is_av(); + virtual bool is_audio(); + virtual bool is_video(); +public: + /** + * generate the chunk header to cache. + * @return the size of header. + */ + virtual int chunk_header(char* cache, int nb_cache, bool c0); +public: + /** + * copy current shared ptr message, use ref-count. + * @remark, assert object is created. + */ + virtual SrsSharedPtrMessage* copy(); +}; + /** * encode data to flv file. */ @@ -91,7 +477,28 @@ public: * @remark assert data_size is not negative. */ static int size_tag(int data_size); +#ifdef SRS_PERF_FAST_FLV_ENCODER private: + // cache tag header. + int nb_tag_headers; + char* tag_headers; + // cache pps(previous tag size) + int nb_ppts; + char* ppts; + // cache iovss. + int nb_iovss_cache; + iovec* iovss_cache; +public: + /** + * write the tags in a time. + */ + virtual int write_tags(SrsSharedPtrMessage** msgs, int count); +#endif +private: + virtual int write_metadata_to_cache(char type, char* data, int size, char* cache); + virtual int write_audio_to_cache(int64_t timestamp, char* data, int size, char* cache); + virtual int write_video_to_cache(int64_t timestamp, char* data, int size, char* cache); + virtual int write_pts_to_cache(int size, char* cache); virtual int write_tag(char* header, int header_size, char* tag, int tag_size); }; diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index 58fb3fa6a..f469e694e 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -40,6 +40,7 @@ using namespace std; #include #include #include +#include // this value must: // equals to (SRS_SYS_CYCLE_INTERVAL*SRS_SYS_TIME_RESOLUTION_MS_TIMES)*1000 @@ -759,3 +760,131 @@ int ff_hex_to_data(u_int8_t* data, const char* p) return len; } +int srs_chunk_header_c0( + int perfer_cid, u_int32_t timestamp, int32_t payload_length, + int8_t message_type, int32_t stream_id, + char* cache, int nb_cache +) { + // to directly set the field. + char* pp = NULL; + + // generate the header. + char* p = cache; + + // no header. + if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { + return 0; + } + + // write new chunk stream header, fmt is 0 + *p++ = 0x00 | (perfer_cid & 0x3F); + + // chunk message header, 11 bytes + // timestamp, 3bytes, big-endian + if (timestamp < RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } else { + *p++ = 0xFF; + *p++ = 0xFF; + *p++ = 0xFF; + } + + // message_length, 3bytes, big-endian + pp = (char*)&payload_length; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + + // message_type, 1bytes + *p++ = message_type; + + // stream_id, 4bytes, little-endian + pp = (char*)&stream_id; + *p++ = pp[0]; + *p++ = pp[1]; + *p++ = pp[2]; + *p++ = pp[3]; + + // for c0 + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // + // for c3: + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // 6.1.3. Extended Timestamp + // This field is transmitted only when the normal time stamp in the + // chunk message header is set to 0x00ffffff. If normal time stamp is + // set to any value less than 0x00ffffff, this field MUST NOT be + // present. This field MUST NOT be present if the timestamp field is not + // present. Type 3 chunks MUST NOT have this field. + // adobe changed for Type3 chunk: + // FMLE always sendout the extended-timestamp, + // must send the extended-timestamp to FMS, + // must send the extended-timestamp to flash-player. + // @see: ngx_rtmp_prepare_message + // @see: http://blog.csdn.net/win_lin/article/details/13363699 + // TODO: FIXME: extract to outer. + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } + + // always has header + return p - cache; +} + +int srs_chunk_header_c3( + int perfer_cid, u_int32_t timestamp, + char* cache, int nb_cache +) { + // to directly set the field. + char* pp = NULL; + + // generate the header. + char* p = cache; + + // no header. + if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE) { + return 0; + } + + // write no message header chunk stream, fmt is 3 + // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, + // SRS will rollback to 1B chunk header. + *p++ = 0xC0 | (perfer_cid & 0x3F); + + // for c0 + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // + // for c3: + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // 6.1.3. Extended Timestamp + // This field is transmitted only when the normal time stamp in the + // chunk message header is set to 0x00ffffff. If normal time stamp is + // set to any value less than 0x00ffffff, this field MUST NOT be + // present. This field MUST NOT be present if the timestamp field is not + // present. Type 3 chunks MUST NOT have this field. + // adobe changed for Type3 chunk: + // FMLE always sendout the extended-timestamp, + // must send the extended-timestamp to FMS, + // must send the extended-timestamp to flash-player. + // @see: ngx_rtmp_prepare_message + // @see: http://blog.csdn.net/win_lin/article/details/13363699 + // TODO: FIXME: extract to outer. + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } + + // always has header + return p - cache; +} + diff --git a/trunk/src/kernel/srs_kernel_utility.hpp b/trunk/src/kernel/srs_kernel_utility.hpp index 678cd6d94..d6406e33d 100644 --- a/trunk/src/kernel/srs_kernel_utility.hpp +++ b/trunk/src/kernel/srs_kernel_utility.hpp @@ -138,5 +138,28 @@ extern char* srs_av_base64_encode(char* out, int out_size, const u_int8_t* in, i */ extern int ff_hex_to_data(u_int8_t* data, const char* p); +/** + * generate the c0 chunk header for msg. + * @param cache, the cache to write header. + * @param nb_cache, the size of cache. + * @return the size of header. 0 if cache not enough. + */ +extern int srs_chunk_header_c0( + int perfer_cid, u_int32_t timestamp, int32_t payload_length, + int8_t message_type, int32_t stream_id, + char* cache, int nb_cache + ); + +/** + * generate the c3 chunk header for msg. + * @param cache, the cache to write header. + * @param nb_cache, the size of cache. + * @return the size of header. 0 if cache not enough. + */ +extern int srs_chunk_header_c3( + int perfer_cid, u_int32_t timestamp, + char* cache, int nb_cache + ); + #endif diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index d107f59b8..c0ca5e9b5 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -120,318 +120,6 @@ using namespace std; /**************************************************************************** ***************************************************************************** ****************************************************************************/ -/** -* the chunk stream id used for some under-layer message, -* for example, the PC(protocol control) message. -*/ -#define RTMP_CID_ProtocolControl 0x02 -/** -* the AMF0/AMF3 command message, invoke method and return the result, over NetConnection. -* generally use 0x03. -*/ -#define RTMP_CID_OverConnection 0x03 -/** -* the AMF0/AMF3 command message, invoke method and return the result, over NetConnection, -* the midst state(we guess). -* rarely used, e.g. onStatus(NetStream.Play.Reset). -*/ -#define RTMP_CID_OverConnection2 0x04 -/** -* the stream message(amf0/amf3), over NetStream. -* generally use 0x05. -*/ -#define RTMP_CID_OverStream 0x05 -/** -* the stream message(amf0/amf3), over NetStream, the midst state(we guess). -* rarely used, e.g. play("mp4:mystram.f4v") -*/ -#define RTMP_CID_OverStream2 0x08 -/** -* the stream message(video), over NetStream -* generally use 0x06. -*/ -#define RTMP_CID_Video 0x06 -/** -* the stream message(audio), over NetStream. -* generally use 0x07. -*/ -#define RTMP_CID_Audio 0x07 - -/**************************************************************************** -***************************************************************************** -****************************************************************************/ - -SrsMessageHeader::SrsMessageHeader() -{ - message_type = 0; - payload_length = 0; - timestamp_delta = 0; - stream_id = 0; - - timestamp = 0; - // we always use the connection chunk-id - perfer_cid = RTMP_CID_OverConnection; -} - -SrsMessageHeader::~SrsMessageHeader() -{ -} - -bool SrsMessageHeader::is_audio() -{ - return message_type == RTMP_MSG_AudioMessage; -} - -bool SrsMessageHeader::is_video() -{ - return message_type == RTMP_MSG_VideoMessage; -} - -bool SrsMessageHeader::is_amf0_command() -{ - return message_type == RTMP_MSG_AMF0CommandMessage; -} - -bool SrsMessageHeader::is_amf0_data() -{ - return message_type == RTMP_MSG_AMF0DataMessage; -} - -bool SrsMessageHeader::is_amf3_command() -{ - return message_type == RTMP_MSG_AMF3CommandMessage; -} - -bool SrsMessageHeader::is_amf3_data() -{ - return message_type == RTMP_MSG_AMF3DataMessage; -} - -bool SrsMessageHeader::is_window_ackledgement_size() -{ - return message_type == RTMP_MSG_WindowAcknowledgementSize; -} - -bool SrsMessageHeader::is_ackledgement() -{ - return message_type == RTMP_MSG_Acknowledgement; -} - -bool SrsMessageHeader::is_set_chunk_size() -{ - return message_type == RTMP_MSG_SetChunkSize; -} - -bool SrsMessageHeader::is_user_control_message() -{ - return message_type == RTMP_MSG_UserControlMessage; -} - -bool SrsMessageHeader::is_set_peer_bandwidth() -{ - return message_type == RTMP_MSG_SetPeerBandwidth; -} - -bool SrsMessageHeader::is_aggregate() -{ - return message_type == RTMP_MSG_AggregateMessage; -} - -void SrsMessageHeader::initialize_amf0_script(int size, int stream) -{ - message_type = RTMP_MSG_AMF0DataMessage; - payload_length = (int32_t)size; - timestamp_delta = (int32_t)0; - timestamp = (int64_t)0; - stream_id = (int32_t)stream; - - // amf0 script use connection2 chunk-id - perfer_cid = RTMP_CID_OverConnection2; -} - -void SrsMessageHeader::initialize_audio(int size, u_int32_t time, int stream) -{ - message_type = RTMP_MSG_AudioMessage; - payload_length = (int32_t)size; - timestamp_delta = (int32_t)time; - timestamp = (int64_t)time; - stream_id = (int32_t)stream; - - // audio chunk-id - perfer_cid = RTMP_CID_Audio; -} - -void SrsMessageHeader::initialize_video(int size, u_int32_t time, int stream) -{ - message_type = RTMP_MSG_VideoMessage; - payload_length = (int32_t)size; - timestamp_delta = (int32_t)time; - timestamp = (int64_t)time; - stream_id = (int32_t)stream; - - // video chunk-id - perfer_cid = RTMP_CID_Video; -} - -SrsCommonMessage::SrsCommonMessage() -{ - payload = NULL; - size = 0; -} - -SrsCommonMessage::~SrsCommonMessage() -{ - srs_freep(payload); -} - -SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload() -{ - payload = NULL; - size = 0; - shared_count = 0; -} - -SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() -{ - srs_freep(payload); -} - -SrsSharedPtrMessage::SrsSharedPtrMessage() -{ - ptr = NULL; -} - -SrsSharedPtrMessage::~SrsSharedPtrMessage() -{ - if (ptr) { - if (ptr->shared_count == 0) { - srs_freep(ptr); - } else { - ptr->shared_count--; - } - } -} - -int SrsSharedPtrMessage::create(SrsCommonMessage* msg) -{ - int ret = ERROR_SUCCESS; - - if ((ret = create(&msg->header, msg->payload, msg->size)) != ERROR_SUCCESS) { - return ret; - } - - // to prevent double free of payload: - // initialize already attach the payload of msg, - // detach the payload to transfer the owner to shared ptr. - msg->payload = NULL; - msg->size = 0; - - return ret; -} - -int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size) -{ - int ret = ERROR_SUCCESS; - - if (ptr) { - ret = ERROR_SYSTEM_ASSERT_FAILED; - srs_error("should not set the payload twice. ret=%d", ret); - srs_assert(false); - - return ret; - } - - ptr = new SrsSharedPtrPayload(); - - // direct attach the data. - if (pheader) { - ptr->header.message_type = pheader->message_type; - ptr->header.payload_length = size; - ptr->header.perfer_cid = pheader->perfer_cid; - this->timestamp = pheader->timestamp; - this->stream_id = pheader->stream_id; - } - ptr->payload = payload; - ptr->size = size; - - // message can access it. - this->payload = ptr->payload; - this->size = ptr->size; - - return ret; -} - -int SrsSharedPtrMessage::count() -{ - srs_assert(ptr); - return ptr->shared_count; -} - -bool SrsSharedPtrMessage::check(int stream_id) -{ - // we donot use the complex basic header, - // ensure the basic header is 1bytes. - if (ptr->header.perfer_cid < 2) { - srs_info("change the chunk_id=%d to default=%d", - ptr->header.perfer_cid, RTMP_CID_ProtocolControl); - ptr->header.perfer_cid = RTMP_CID_ProtocolControl; - } - - // we assume that the stream_id in a group must be the same. - if (this->stream_id == stream_id) { - return true; - } - this->stream_id = stream_id; - - return false; -} - -bool SrsSharedPtrMessage::is_av() -{ - return ptr->header.message_type == RTMP_MSG_AudioMessage - || ptr->header.message_type == RTMP_MSG_VideoMessage; -} - -bool SrsSharedPtrMessage::is_audio() -{ - return ptr->header.message_type == RTMP_MSG_AudioMessage; -} - -bool SrsSharedPtrMessage::is_video() -{ - return ptr->header.message_type == RTMP_MSG_VideoMessage; -} - -int SrsSharedPtrMessage::chunk_header(char* cache, int nb_cache, bool c0) -{ - if (c0) { - return srs_chunk_header_c0( - ptr->header.perfer_cid, timestamp, ptr->header.payload_length, - ptr->header.message_type, stream_id, - cache, nb_cache); - } else { - return srs_chunk_header_c3( - ptr->header.perfer_cid, timestamp, - cache, nb_cache); - } -} - -SrsSharedPtrMessage* SrsSharedPtrMessage::copy() -{ - srs_assert(ptr); - - SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); - - copy->ptr = ptr; - ptr->shared_count++; - - copy->timestamp = timestamp; - copy->stream_id = stream_id; - copy->payload = ptr->payload; - copy->size = ptr->size; - - return copy; -} SrsPacket::SrsPacket() { @@ -904,41 +592,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) int SrsProtocol::do_iovs_send(iovec* iovs, int size) { - int ret = ERROR_SUCCESS; - - // the limits of writev iovs. - // for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 -#ifndef _WIN32 - static int limits = sysconf(_SC_IOV_MAX); -#else - static int limits = 1024; -#endif - - // send in a time. - if (size < limits) { - if ((ret = skt->writev(iovs, size, NULL)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("send with writev failed. ret=%d", ret); - } - return ret; - } - return ret; - } - - // send in multiple times. - int cur_iov = 0; - while (cur_iov < size) { - int cur_count = srs_min(limits, size - cur_iov); - if ((ret = skt->writev(iovs + cur_iov, cur_count, NULL)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("send with writev failed. ret=%d", ret); - } - return ret; - } - cur_iov += cur_count; - } - - return ret; + return srs_write_large_iovs(skt, iovs, size); } int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id) diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index c0eb1dfec..31b45ce95 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -43,6 +43,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include class ISrsProtocolReaderWriter; class SrsFastBuffer; @@ -56,89 +57,6 @@ class SrsChunkStream; class SrsSharedPtrMessage; class IMergeReadHandler; -/**************************************************************************** - ***************************************************************************** - ****************************************************************************/ -/** - 5. Protocol Control Messages - RTMP reserves message type IDs 1-7 for protocol control messages. - These messages contain information needed by the RTM Chunk Stream - protocol or RTMP itself. Protocol messages with IDs 1 & 2 are - reserved for usage with RTM Chunk Stream protocol. Protocol messages - with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID - 7 is used between edge server and origin server. - */ -#define RTMP_MSG_SetChunkSize 0x01 -#define RTMP_MSG_AbortMessage 0x02 -#define RTMP_MSG_Acknowledgement 0x03 -#define RTMP_MSG_UserControlMessage 0x04 -#define RTMP_MSG_WindowAcknowledgementSize 0x05 -#define RTMP_MSG_SetPeerBandwidth 0x06 -#define RTMP_MSG_EdgeAndOriginServerCommand 0x07 -/** - 3. Types of messages - The server and the client send messages over the network to - communicate with each other. The messages can be of any type which - includes audio messages, video messages, command messages, shared - object messages, data messages, and user control messages. - 3.1. Command message - Command messages carry the AMF-encoded commands between the client - and the server. These messages have been assigned message type value - of 20 for AMF0 encoding and message type value of 17 for AMF3 - encoding. These messages are sent to perform some operations like - connect, createStream, publish, play, pause on the peer. Command - messages like onstatus, result etc. are used to inform the sender - about the status of the requested commands. A command message - consists of command name, transaction ID, and command object that - contains related parameters. A client or a server can request Remote - Procedure Calls (RPC) over streams that are communicated using the - command messages to the peer. - */ -#define RTMP_MSG_AMF3CommandMessage 17 // 0x11 -#define RTMP_MSG_AMF0CommandMessage 20 // 0x14 -/** - 3.2. Data message - The client or the server sends this message to send Metadata or any - user data to the peer. Metadata includes details about the - data(audio, video etc.) like creation time, duration, theme and so - on. These messages have been assigned message type value of 18 for - AMF0 and message type value of 15 for AMF3. - */ -#define RTMP_MSG_AMF0DataMessage 18 // 0x12 -#define RTMP_MSG_AMF3DataMessage 15 // 0x0F -/** - 3.3. Shared object message - A shared object is a Flash object (a collection of name value pairs) - that are in synchronization across multiple clients, instances, and - so on. The message types kMsgContainer=19 for AMF0 and - kMsgContainerEx=16 for AMF3 are reserved for shared object events. - Each message can contain multiple events. - */ -#define RTMP_MSG_AMF3SharedObject 16 // 0x10 -#define RTMP_MSG_AMF0SharedObject 19 // 0x13 -/** - 3.4. Audio message - The client or the server sends this message to send audio data to the - peer. The message type value of 8 is reserved for audio messages. - */ -#define RTMP_MSG_AudioMessage 8 // 0x08 -/* * - 3.5. Video message - The client or the server sends this message to send video data to the - peer. The message type value of 9 is reserved for video messages. - These messages are large and can delay the sending of other type of - messages. To avoid such a situation, the video message is assigned - the lowest priority. - */ -#define RTMP_MSG_VideoMessage 9 // 0x09 -/** - 3.6. Aggregate message - An aggregate message is a single message that contains a list of submessages. - The message type value of 22 is reserved for aggregate - messages. - */ -#define RTMP_MSG_AggregateMessage 22 // 0x16 - /**************************************************************************** ***************************************************************************** ****************************************************************************/ @@ -163,263 +81,6 @@ class IMergeReadHandler; /**************************************************************************** ***************************************************************************** ****************************************************************************/ -/** -* 6.1. Chunk Format -* Extended timestamp: 0 or 4 bytes -* This field MUST be sent when the normal timsestamp is set to -* 0xffffff, it MUST NOT be sent if the normal timestamp is set to -* anything else. So for values less than 0xffffff the normal -* timestamp field SHOULD be used in which case the extended timestamp -* MUST NOT be present. For values greater than or equal to 0xffffff -* the normal timestamp field MUST NOT be used and MUST be set to -* 0xffffff and the extended timestamp MUST be sent. -*/ -#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF - -/** -* 4.1. Message Header -*/ -class SrsMessageHeader -{ -public: - /** - * 3bytes. - * Three-byte field that contains a timestamp delta of the message. - * @remark, only used for decoding message from chunk stream. - */ - int32_t timestamp_delta; - /** - * 3bytes. - * Three-byte field that represents the size of the payload in bytes. - * It is set in big-endian format. - */ - int32_t payload_length; - /** - * 1byte. - * One byte field to represent the message type. A range of type IDs - * (1-7) are reserved for protocol control messages. - */ - int8_t message_type; - /** - * 4bytes. - * Four-byte field that identifies the stream of the message. These - * bytes are set in little-endian format. - */ - int32_t stream_id; - - /** - * Four-byte field that contains a timestamp of the message. - * The 4 bytes are packed in the big-endian order. - * @remark, used as calc timestamp when decode and encode time. - * @remark, we use 64bits for large time for jitter detect and hls. - */ - int64_t timestamp; -public: - /** - * get the perfered cid(chunk stream id) which sendout over. - * set at decoding, and canbe used for directly send message, - * for example, dispatch to all connections. - */ - int perfer_cid; -public: - SrsMessageHeader(); - virtual ~SrsMessageHeader(); -public: - bool is_audio(); - bool is_video(); - bool is_amf0_command(); - bool is_amf0_data(); - bool is_amf3_command(); - bool is_amf3_data(); - bool is_window_ackledgement_size(); - bool is_ackledgement(); - bool is_set_chunk_size(); - bool is_user_control_message(); - bool is_set_peer_bandwidth(); - bool is_aggregate(); -public: - /** - * create a amf0 script header, set the size and stream_id. - */ - void initialize_amf0_script(int size, int stream); - /** - * create a audio header, set the size, timestamp and stream_id. - */ - void initialize_audio(int size, u_int32_t time, int stream); - /** - * create a video header, set the size, timestamp and stream_id. - */ - void initialize_video(int size, u_int32_t time, int stream); -}; - -/** -* message is raw data RTMP message, bytes oriented, -* protcol always recv RTMP message, and can send RTMP message or RTMP packet. -* the common message is read from underlay protocol sdk. -* while the shared ptr message used to copy and send. -*/ -class SrsCommonMessage -{ -// 4.1. Message Header -public: - SrsMessageHeader header; -// 4.2. Message Payload -public: - /** - * current message parsed size, - * size <= header.payload_length - * for the payload maybe sent in multiple chunks. - */ - int size; - /** - * the payload of message, the SrsCommonMessage never know about the detail of payload, - * user must use SrsProtocol.decode_message to get concrete packet. - * @remark, not all message payload can be decoded to packet. for example, - * video/audio packet use raw bytes, no video/audio packet. - */ - char* payload; -public: - SrsCommonMessage(); -public: - virtual ~SrsCommonMessage(); -}; - -/** -* the message header for shared ptr message. -* only the message for all msgs are same. -*/ -struct SrsSharedMessageHeader -{ - /** - * 3bytes. - * Three-byte field that represents the size of the payload in bytes. - * It is set in big-endian format. - */ - int32_t payload_length; - /** - * 1byte. - * One byte field to represent the message type. A range of type IDs - * (1-7) are reserved for protocol control messages. - */ - int8_t message_type; - /** - * get the perfered cid(chunk stream id) which sendout over. - * set at decoding, and canbe used for directly send message, - * for example, dispatch to all connections. - */ - int perfer_cid; -}; - -/** -* shared ptr message. -* for audio/video/data message that need less memory copy. -* and only for output. -* -* create first object by constructor and create(), -* use copy if need reference count message. -* -*/ -class SrsSharedPtrMessage -{ -// 4.1. Message Header -public: - // the header can shared, only set the timestamp and stream id. - // @see https://github.com/simple-rtmp-server/srs/issues/251 - //SrsSharedMessageHeader header; - /** - * Four-byte field that contains a timestamp of the message. - * The 4 bytes are packed in the big-endian order. - * @remark, used as calc timestamp when decode and encode time. - * @remark, we use 64bits for large time for jitter detect and hls. - */ - int64_t timestamp; - /** - * 4bytes. - * Four-byte field that identifies the stream of the message. These - * bytes are set in big-endian format. - */ - int32_t stream_id; -// 4.2. Message Payload -public: - /** - * current message parsed size, - * size <= header.payload_length - * for the payload maybe sent in multiple chunks. - */ - int size; - /** - * the payload of message, the SrsCommonMessage never know about the detail of payload, - * user must use SrsProtocol.decode_message to get concrete packet. - * @remark, not all message payload can be decoded to packet. for example, - * video/audio packet use raw bytes, no video/audio packet. - */ - char* payload; -private: - class SrsSharedPtrPayload - { - public: - // shared message header. - // @see https://github.com/simple-rtmp-server/srs/issues/251 - SrsSharedMessageHeader header; - // actual shared payload. - char* payload; - // size of payload. - int size; - // the reference count - int shared_count; - public: - SrsSharedPtrPayload(); - virtual ~SrsSharedPtrPayload(); - }; - SrsSharedPtrPayload* ptr; -public: - SrsSharedPtrMessage(); - virtual ~SrsSharedPtrMessage(); -public: - /** - * create shared ptr message, - * copy header, manage the payload of msg, - * set the payload to NULL to prevent double free. - * @remark payload of msg set to NULL if success. - */ - virtual int create(SrsCommonMessage* msg); - /** - * create shared ptr message, - * from the header and payload. - * @remark user should never free the payload. - * @param pheader, the header to copy to the message. NULL to ignore. - */ - virtual int create(SrsMessageHeader* pheader, char* payload, int size); - /** - * get current reference count. - * when this object created, count set to 0. - * if copy() this object, count increase 1. - * if this or copy deleted, free payload when count is 0, or count--. - * @remark, assert object is created. - */ - virtual int count(); - /** - * check perfer cid and stream id. - * @return whether stream id already set. - */ - virtual bool check(int stream_id); -public: - virtual bool is_av(); - virtual bool is_audio(); - virtual bool is_video(); -public: - /** - * generate the chunk header to cache. - * @return the size of header. - */ - virtual int chunk_header(char* cache, int nb_cache, bool c0); -public: - /** - * copy current shared ptr message, use ref-count. - * @remark, assert object is created. - */ - virtual SrsSharedPtrMessage* copy(); -}; /** * the decoded message payload. diff --git a/trunk/src/protocol/srs_rtmp_utility.cpp b/trunk/src/protocol/srs_rtmp_utility.cpp index 7c892ad36..4d346ed0c 100644 --- a/trunk/src/protocol/srs_rtmp_utility.cpp +++ b/trunk/src/protocol/srs_rtmp_utility.cpp @@ -23,6 +23,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +// for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 +#ifndef _WIN32 +#include +#endif + #include using namespace std; @@ -32,6 +37,8 @@ using namespace std; #include #include #include +#include +#include void srs_discovery_tc_url( string tcUrl, @@ -159,136 +166,6 @@ bool srs_bytes_equals(void* pa, void* pb, int size) return true; } -int srs_chunk_header_c0( - int perfer_cid, u_int32_t timestamp, int32_t payload_length, - int8_t message_type, int32_t stream_id, - char* cache, int nb_cache -) -{ - // to directly set the field. - char* pp = NULL; - - // generate the header. - char* p = cache; - - // no header. - if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { - return 0; - } - - // write new chunk stream header, fmt is 0 - *p++ = 0x00 | (perfer_cid & 0x3F); - - // chunk message header, 11 bytes - // timestamp, 3bytes, big-endian - if (timestamp < RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; - } else { - *p++ = 0xFF; - *p++ = 0xFF; - *p++ = 0xFF; - } - - // message_length, 3bytes, big-endian - pp = (char*)&payload_length; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; - - // message_type, 1bytes - *p++ = message_type; - - // stream_id, 4bytes, little-endian - pp = (char*)&stream_id; - *p++ = pp[0]; - *p++ = pp[1]; - *p++ = pp[2]; - *p++ = pp[3]; - - // for c0 - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // - // for c3: - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // 6.1.3. Extended Timestamp - // This field is transmitted only when the normal time stamp in the - // chunk message header is set to 0x00ffffff. If normal time stamp is - // set to any value less than 0x00ffffff, this field MUST NOT be - // present. This field MUST NOT be present if the timestamp field is not - // present. Type 3 chunks MUST NOT have this field. - // adobe changed for Type3 chunk: - // FMLE always sendout the extended-timestamp, - // must send the extended-timestamp to FMS, - // must send the extended-timestamp to flash-player. - // @see: ngx_rtmp_prepare_message - // @see: http://blog.csdn.net/win_lin/article/details/13363699 - // TODO: FIXME: extract to outer. - if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *p++ = pp[3]; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; - } - - // always has header - return p - cache; -} - -int srs_chunk_header_c3( - int perfer_cid, u_int32_t timestamp, - char* cache, int nb_cache -) -{ - // to directly set the field. - char* pp = NULL; - - // generate the header. - char* p = cache; - - // no header. - if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE) { - return 0; - } - - // write no message header chunk stream, fmt is 3 - // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, - // SRS will rollback to 1B chunk header. - *p++ = 0xC0 | (perfer_cid & 0x3F); - - // for c0 - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // - // for c3: - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // 6.1.3. Extended Timestamp - // This field is transmitted only when the normal time stamp in the - // chunk message header is set to 0x00ffffff. If normal time stamp is - // set to any value less than 0x00ffffff, this field MUST NOT be - // present. This field MUST NOT be present if the timestamp field is not - // present. Type 3 chunks MUST NOT have this field. - // adobe changed for Type3 chunk: - // FMLE always sendout the extended-timestamp, - // must send the extended-timestamp to FMS, - // must send the extended-timestamp to flash-player. - // @see: ngx_rtmp_prepare_message - // @see: http://blog.csdn.net/win_lin/article/details/13363699 - // TODO: FIXME: extract to outer. - if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *p++ = pp[3]; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; - } - - // always has header - return p - cache; -} - int srs_do_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, int stream_id, SrsSharedPtrMessage** ppmsg) { int ret = ERROR_SUCCESS; @@ -362,3 +239,43 @@ std::string srs_generate_stream_url(std::string vhost, std::string app, std::str return url; } +int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite) +{ + int ret = ERROR_SUCCESS; + + // the limits of writev iovs. + // for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 +#ifndef _WIN32 + // for linux, generally it's 1024. + static int limits = sysconf(_SC_IOV_MAX); +#else + static int limits = 1024; +#endif + + // send in a time. + if (size < limits) { + if ((ret = skt->writev(iovs, size, pnwrite)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send with writev failed. ret=%d", ret); + } + return ret; + } + return ret; + } + + // send in multiple times. + int cur_iov = 0; + while (cur_iov < size) { + int cur_count = srs_min(limits, size - cur_iov); + if ((ret = skt->writev(iovs + cur_iov, cur_count, pnwrite)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send with writev failed. ret=%d", ret); + } + return ret; + } + cur_iov += cur_count; + } + + return ret; +} + diff --git a/trunk/src/protocol/srs_rtmp_utility.hpp b/trunk/src/protocol/srs_rtmp_utility.hpp index 09e834d09..9b4c3b35a 100644 --- a/trunk/src/protocol/srs_rtmp_utility.hpp +++ b/trunk/src/protocol/srs_rtmp_utility.hpp @@ -29,12 +29,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include +// for srs-librtmp, @see https://github.com/simple-rtmp-server/srs/issues/213 +#ifndef _WIN32 +#include +#endif + #include #include class SrsMessageHeader; class SrsSharedPtrMessage; +class ISrsProtocolReaderWriter; /** * parse the tcUrl, output the schema, host, vhost, app and port. @@ -90,29 +96,6 @@ extern std::string srs_generate_tc_url( */ extern bool srs_bytes_equals(void* pa, void* pb, int size); -/** -* generate the c0 chunk header for msg. -* @param cache, the cache to write header. -* @param nb_cache, the size of cache. -* @return the size of header. 0 if cache not enough. -*/ -extern int srs_chunk_header_c0( - int perfer_cid, u_int32_t timestamp, int32_t payload_length, - int8_t message_type, int32_t stream_id, - char* cache, int nb_cache -); - -/** -* generate the c3 chunk header for msg. -* @param cache, the cache to write header. -* @param nb_cache, the size of cache. -* @return the size of header. 0 if cache not enough. -*/ -extern int srs_chunk_header_c3( - int perfer_cid, u_int32_t timestamp, - char* cache, int nb_cache -); - /** * create shared ptr message from bytes. * @param data the packet bytes. user should never free it. @@ -123,5 +106,8 @@ extern int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int s // get the stream identify, vhost/app/stream. extern std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream); +// write large numbers of iovs. +extern int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL); + #endif