1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

fix #405, improve the HTTP FLV performance to 6k. 2.0.171

This commit is contained in:
winlin 2015-05-25 01:02:06 +08:00
parent 4df19ba99a
commit d12fc7fcc5
13 changed files with 1193 additions and 922 deletions

View file

@ -37,16 +37,304 @@ using namespace std;
#include <srs_kernel_stream.hpp>
#include <srs_kernel_file.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_utility.hpp>
SrsMessageHeader::SrsMessageHeader()
{
message_type = 0;
payload_length = 0;
timestamp_delta = 0;
stream_id = 0;
timestamp = 0;
// we always use the connection chunk-id
perfer_cid = RTMP_CID_OverConnection;
}
SrsMessageHeader::~SrsMessageHeader()
{
}
bool SrsMessageHeader::is_audio()
{
return message_type == RTMP_MSG_AudioMessage;
}
bool SrsMessageHeader::is_video()
{
return message_type == RTMP_MSG_VideoMessage;
}
bool SrsMessageHeader::is_amf0_command()
{
return message_type == RTMP_MSG_AMF0CommandMessage;
}
bool SrsMessageHeader::is_amf0_data()
{
return message_type == RTMP_MSG_AMF0DataMessage;
}
bool SrsMessageHeader::is_amf3_command()
{
return message_type == RTMP_MSG_AMF3CommandMessage;
}
bool SrsMessageHeader::is_amf3_data()
{
return message_type == RTMP_MSG_AMF3DataMessage;
}
bool SrsMessageHeader::is_window_ackledgement_size()
{
return message_type == RTMP_MSG_WindowAcknowledgementSize;
}
bool SrsMessageHeader::is_ackledgement()
{
return message_type == RTMP_MSG_Acknowledgement;
}
bool SrsMessageHeader::is_set_chunk_size()
{
return message_type == RTMP_MSG_SetChunkSize;
}
bool SrsMessageHeader::is_user_control_message()
{
return message_type == RTMP_MSG_UserControlMessage;
}
bool SrsMessageHeader::is_set_peer_bandwidth()
{
return message_type == RTMP_MSG_SetPeerBandwidth;
}
bool SrsMessageHeader::is_aggregate()
{
return message_type == RTMP_MSG_AggregateMessage;
}
void SrsMessageHeader::initialize_amf0_script(int size, int stream)
{
message_type = RTMP_MSG_AMF0DataMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)0;
timestamp = (int64_t)0;
stream_id = (int32_t)stream;
// amf0 script use connection2 chunk-id
perfer_cid = RTMP_CID_OverConnection2;
}
void SrsMessageHeader::initialize_audio(int size, u_int32_t time, int stream)
{
message_type = RTMP_MSG_AudioMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)time;
timestamp = (int64_t)time;
stream_id = (int32_t)stream;
// audio chunk-id
perfer_cid = RTMP_CID_Audio;
}
void SrsMessageHeader::initialize_video(int size, u_int32_t time, int stream)
{
message_type = RTMP_MSG_VideoMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)time;
timestamp = (int64_t)time;
stream_id = (int32_t)stream;
// video chunk-id
perfer_cid = RTMP_CID_Video;
}
SrsCommonMessage::SrsCommonMessage()
{
payload = NULL;
size = 0;
}
SrsCommonMessage::~SrsCommonMessage()
{
srs_freep(payload);
}
SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload()
{
payload = NULL;
size = 0;
shared_count = 0;
}
SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload()
{
srs_freep(payload);
}
SrsSharedPtrMessage::SrsSharedPtrMessage()
{
ptr = NULL;
}
SrsSharedPtrMessage::~SrsSharedPtrMessage()
{
if (ptr) {
if (ptr->shared_count == 0) {
srs_freep(ptr);
} else {
ptr->shared_count--;
}
}
}
int SrsSharedPtrMessage::create(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
if ((ret = create(&msg->header, msg->payload, msg->size)) != ERROR_SUCCESS) {
return ret;
}
// to prevent double free of payload:
// initialize already attach the payload of msg,
// detach the payload to transfer the owner to shared ptr.
msg->payload = NULL;
msg->size = 0;
return ret;
}
int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size)
{
int ret = ERROR_SUCCESS;
if (ptr) {
ret = ERROR_SYSTEM_ASSERT_FAILED;
srs_error("should not set the payload twice. ret=%d", ret);
srs_assert(false);
return ret;
}
ptr = new SrsSharedPtrPayload();
// direct attach the data.
if (pheader) {
ptr->header.message_type = pheader->message_type;
ptr->header.payload_length = size;
ptr->header.perfer_cid = pheader->perfer_cid;
this->timestamp = pheader->timestamp;
this->stream_id = pheader->stream_id;
}
ptr->payload = payload;
ptr->size = size;
// message can access it.
this->payload = ptr->payload;
this->size = ptr->size;
return ret;
}
int SrsSharedPtrMessage::count()
{
srs_assert(ptr);
return ptr->shared_count;
}
bool SrsSharedPtrMessage::check(int stream_id)
{
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (ptr->header.perfer_cid < 2) {
srs_info("change the chunk_id=%d to default=%d",
ptr->header.perfer_cid, RTMP_CID_ProtocolControl);
ptr->header.perfer_cid = RTMP_CID_ProtocolControl;
}
// we assume that the stream_id in a group must be the same.
if (this->stream_id == stream_id) {
return true;
}
this->stream_id = stream_id;
return false;
}
bool SrsSharedPtrMessage::is_av()
{
return ptr->header.message_type == RTMP_MSG_AudioMessage
|| ptr->header.message_type == RTMP_MSG_VideoMessage;
}
bool SrsSharedPtrMessage::is_audio()
{
return ptr->header.message_type == RTMP_MSG_AudioMessage;
}
bool SrsSharedPtrMessage::is_video()
{
return ptr->header.message_type == RTMP_MSG_VideoMessage;
}
int SrsSharedPtrMessage::chunk_header(char* cache, int nb_cache, bool c0)
{
if (c0) {
return srs_chunk_header_c0(
ptr->header.perfer_cid, timestamp, ptr->header.payload_length,
ptr->header.message_type, stream_id,
cache, nb_cache);
} else {
return srs_chunk_header_c3(
ptr->header.perfer_cid, timestamp,
cache, nb_cache);
}
}
SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
{
srs_assert(ptr);
SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();
copy->ptr = ptr;
ptr->shared_count++;
copy->timestamp = timestamp;
copy->stream_id = stream_id;
copy->payload = ptr->payload;
copy->size = ptr->size;
return copy;
}
SrsFlvEncoder::SrsFlvEncoder()
{
_fs = NULL;
tag_stream = new SrsStream();
#ifdef SRS_PERF_FAST_FLV_ENCODER
nb_tag_headers = 0;
tag_headers = NULL;
nb_iovss_cache = 0;
iovss_cache = NULL;
nb_ppts = 0;
ppts = NULL;
#endif
}
SrsFlvEncoder::~SrsFlvEncoder()
{
srs_freep(tag_stream);
#ifdef SRS_PERF_FAST_FLV_ENCODER
srs_freep(tag_headers);
srs_freep(iovss_cache);
srs_freep(ppts);
#endif
}
int SrsFlvEncoder::initialize(SrsFileWriter* fs)
@ -115,23 +403,9 @@ int SrsFlvEncoder::write_metadata(char type, char* data, int size)
srs_assert(data);
// 11 bytes tag header
/*char tag_header[] = {
(char)type, // TagType UB [5], 18 = script data
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
// write data size.
if ((ret = tag_stream->initialize(tag_header, 8)) != ERROR_SUCCESS) {
if ((ret = write_metadata_to_cache(type, data, size, tag_header)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(type);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes(0x00);
tag_stream->write_1bytes(0x00);
if ((ret = write_tag(tag_header, sizeof(tag_header), data, size)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
@ -149,26 +423,9 @@ int SrsFlvEncoder::write_audio(int64_t timestamp, char* data, int size)
srs_assert(data);
timestamp &= 0x7fffffff;
// 11bytes tag header
/*char tag_header[] = {
(char)SrsCodecFlvTagAudio, // TagType UB [5], 8 = audio
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
// write data size.
if ((ret = tag_stream->initialize(tag_header, 8)) != ERROR_SUCCESS) {
if ((ret = write_audio_to_cache(timestamp, data, size, tag_header)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(SrsCodecFlvTagAudio);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
if ((ret = write_tag(tag_header, sizeof(tag_header), data, size)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
@ -186,26 +443,9 @@ int SrsFlvEncoder::write_video(int64_t timestamp, char* data, int size)
srs_assert(data);
timestamp &= 0x7fffffff;
// 11bytes tag header
/*char tag_header[] = {
(char)SrsCodecFlvTagVideo, // TagType UB [5], 9 = video
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
// write data size.
if ((ret = tag_stream->initialize(tag_header, 8)) != ERROR_SUCCESS) {
if ((ret = write_video_to_cache(timestamp, data, size, tag_header)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(SrsCodecFlvTagVideo);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
if ((ret = write_tag(tag_header, sizeof(tag_header), data, size)) != ERROR_SUCCESS) {
srs_error("write flv video tag failed. ret=%d", ret);
@ -221,16 +461,200 @@ int SrsFlvEncoder::size_tag(int data_size)
return SRS_FLV_TAG_HEADER_SIZE + data_size + SRS_FLV_PREVIOUS_TAG_SIZE;
}
#ifdef SRS_PERF_FAST_FLV_ENCODER
int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
{
int ret = ERROR_SUCCESS;
// realloc the iovss.
int nb_iovss = 3 * count;
iovec* iovss = iovss_cache;
if (nb_iovss_cache < nb_iovss) {
srs_freep(iovss_cache);
nb_iovss_cache = nb_iovss;
iovss = iovss_cache = new iovec[nb_iovss];
}
// realloc the tag headers.
char* cache = tag_headers;
if (nb_tag_headers < count) {
srs_freep(tag_headers);
nb_tag_headers = count;
cache = tag_headers = new char[SRS_FLV_TAG_HEADER_SIZE * count];
}
// realloc the pts.
char* pts = ppts;
if (nb_ppts < count) {
srs_freep(ppts);
nb_ppts = count;
pts = ppts = new char[SRS_FLV_PREVIOUS_TAG_SIZE * count];
}
// the cache is ok, write each messages.
iovec* iovs = iovss;
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
// cache all flv header.
if (msg->is_audio()) {
if ((ret = write_audio_to_cache(msg->timestamp, msg->payload, msg->size, cache)) != ERROR_SUCCESS) {
return ret;
}
} else if (msg->is_video()) {
if ((ret = write_video_to_cache(msg->timestamp, msg->payload, msg->size, cache)) != ERROR_SUCCESS) {
return ret;
}
} else {
if ((ret = write_metadata_to_cache(SrsCodecFlvTagScript, msg->payload, msg->size, cache)) != ERROR_SUCCESS) {
return ret;
}
}
// cache all pts.
if ((ret = write_pts_to_cache(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts)) != ERROR_SUCCESS) {
return ret;
}
// all ioves.
iovs[0].iov_base = cache;
iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE;
iovs[1].iov_base = msg->payload;
iovs[1].iov_len = msg->size;
iovs[2].iov_base = pts;
iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE;
// move next.
cache += SRS_FLV_TAG_HEADER_SIZE;
pts += SRS_FLV_PREVIOUS_TAG_SIZE;
iovs += 3;
}
if ((ret = _fs->writev(iovss, nb_iovss, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("write flv tags failed. ret=%d", ret);
}
return ret;
}
return ret;
}
#endif
int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char* cache)
{
int ret = ERROR_SUCCESS;
srs_assert(data);
// 11 bytes tag header
/*char tag_header[] = {
(char)type, // TagType UB [5], 18 = script data
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
// write data size.
if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(type);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes(0x00);
tag_stream->write_1bytes(0x00);
tag_stream->write_3bytes(0x00);
return ret;
}
int SrsFlvEncoder::write_audio_to_cache(int64_t timestamp, char* data, int size, char* cache)
{
int ret = ERROR_SUCCESS;
srs_assert(data);
timestamp &= 0x7fffffff;
// 11bytes tag header
/*char tag_header[] = {
(char)SrsCodecFlvTagAudio, // TagType UB [5], 8 = audio
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
// write data size.
if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(SrsCodecFlvTagAudio);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
tag_stream->write_3bytes(0x00);
return ret;
}
int SrsFlvEncoder::write_video_to_cache(int64_t timestamp, char* data, int size, char* cache)
{
int ret = ERROR_SUCCESS;
srs_assert(data);
timestamp &= 0x7fffffff;
// 11bytes tag header
/*char tag_header[] = {
(char)SrsCodecFlvTagVideo, // TagType UB [5], 9 = video
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
// write data size.
if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(SrsCodecFlvTagVideo);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
tag_stream->write_3bytes(0x00);
return ret;
}
int SrsFlvEncoder::write_pts_to_cache(int size, char* cache)
{
int ret = ERROR_SUCCESS;
if ((ret = tag_stream->initialize(cache, SRS_FLV_PREVIOUS_TAG_SIZE)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_4bytes(size);
return ret;
}
int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_size)
{
int ret = ERROR_SUCCESS;
// PreviousTagSizeN UI32 Size of last tag, including its header, in bytes.
char pre_size[SRS_FLV_PREVIOUS_TAG_SIZE];
if ((ret = tag_stream->initialize(pre_size, SRS_FLV_PREVIOUS_TAG_SIZE)) != ERROR_SUCCESS) {
if ((ret = write_pts_to_cache(tag_size + header_size, pre_size)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_4bytes(tag_size + header_size);
iovec iovs[3];
iovs[0].iov_base = header;
@ -238,7 +662,7 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s
iovs[1].iov_base = tag;
iovs[1].iov_len = tag_size;
iovs[2].iov_base = pre_size;
iovs[2].iov_len = sizeof(SRS_FLV_PREVIOUS_TAG_SIZE);
iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE;
if ((ret = _fs->writev(iovs, 3, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {