1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for bug #251, the shared ptr message share the header. 2.0.64

This commit is contained in:
winlin 2014-12-07 11:25:05 +08:00
parent 330819fb74
commit bba6063492
11 changed files with 313 additions and 125 deletions

View file

@ -213,7 +213,7 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio)
char* payload = audio->payload; char* payload = audio->payload;
int size = audio->size; int size = audio->size;
int64_t timestamp = filter_timestamp(audio->header.timestamp); int64_t timestamp = filter_timestamp(audio->timestamp);
if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) { if ((ret = enc->write_audio(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -262,7 +262,7 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* __video)
return ret; return ret;
} }
int32_t timestamp = filter_timestamp(video->header.timestamp); int32_t timestamp = filter_timestamp(video->timestamp);
if ((ret = enc->write_video(timestamp, payload, size)) != ERROR_SUCCESS) { if ((ret = enc->write_video(timestamp, payload, size)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -332,20 +332,20 @@ int SrsDvrPlan::update_duration(SrsSharedPtrMessage* msg)
// set the segment starttime at first time // set the segment starttime at first time
if (segment->starttime < 0) { if (segment->starttime < 0) {
segment->starttime = msg->header.timestamp; segment->starttime = msg->timestamp;
} }
// no previous packet or timestamp overflow. // no previous packet or timestamp overflow.
if (segment->stream_previous_pkt_time < 0 || segment->stream_previous_pkt_time > msg->header.timestamp) { if (segment->stream_previous_pkt_time < 0 || segment->stream_previous_pkt_time > msg->timestamp) {
segment->stream_previous_pkt_time = msg->header.timestamp; segment->stream_previous_pkt_time = msg->timestamp;
} }
// collect segment and stream duration, timestamp overflow is ok. // collect segment and stream duration, timestamp overflow is ok.
segment->duration += msg->header.timestamp - segment->stream_previous_pkt_time; segment->duration += msg->timestamp - segment->stream_previous_pkt_time;
segment->stream_duration += msg->header.timestamp - segment->stream_previous_pkt_time; segment->stream_duration += msg->timestamp - segment->stream_previous_pkt_time;
// update previous packet time // update previous packet time
segment->stream_previous_pkt_time = msg->header.timestamp; segment->stream_previous_pkt_time = msg->timestamp;
return ret; return ret;
} }
@ -488,7 +488,7 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg)
// when wait keyframe, ignore if no frame arrived. // when wait keyframe, ignore if no frame arrived.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/177 // @see https://github.com/winlinvip/simple-rtmp-server/issues/177
if (_srs_config->get_dvr_wait_keyframe(_req->vhost)) { if (_srs_config->get_dvr_wait_keyframe(_req->vhost)) {
if (!msg->header.is_video()) { if (!msg->is_video()) {
return ret; return ret;
} }

View file

@ -560,7 +560,7 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
} }
srs_verbose("initialize shared ptr msg success."); srs_verbose("initialize shared ptr msg success.");
copy.header.stream_id = stream_id; copy.stream_id = stream_id;
if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) { if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) {
srs_error("enqueue edge publish msg failed. ret=%d", ret); srs_error("enqueue edge publish msg failed. ret=%d", ret);
} }

View file

@ -1458,7 +1458,7 @@ int SrsHls::on_audio(SrsSharedPtrMessage* __audio)
} }
// the pts calc from rtmp/flv header. // the pts calc from rtmp/flv header.
int64_t pts = audio->header.timestamp * 90; int64_t pts = audio->timestamp * 90;
// for pure audio, we need to update the stream dts also. // for pure audio, we need to update the stream dts also.
stream_dts = pts; stream_dts = pts;
@ -1503,7 +1503,7 @@ int SrsHls::on_video(SrsSharedPtrMessage* __video)
return ret; return ret;
} }
int64_t dts = video->header.timestamp * 90; int64_t dts = video->timestamp * 90;
stream_dts = dts; stream_dts = dts;
if ((ret = hls_cache->write_video(codec, muxer, dts, sample)) != ERROR_SUCCESS) { if ((ret = hls_cache->write_video(codec, muxer, dts, sample)) != ERROR_SUCCESS) {
srs_error("hls cache write video failed. ret=%d", ret); srs_error("hls cache write video failed. ret=%d", ret);

View file

@ -642,11 +642,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// foreach msg, collect the duration. // foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it. // @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->header.timestamp) { if (starttime < 0 || starttime > msg->timestamp) {
starttime = msg->header.timestamp; starttime = msg->timestamp;
} }
duration += msg->header.timestamp - starttime; duration += msg->timestamp - starttime;
starttime = msg->header.timestamp; starttime = msg->timestamp;
} }
} }

View file

@ -85,10 +85,10 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, SrsRtmpJi
if (ag == SrsRtmpJitterAlgorithmZERO) { if (ag == SrsRtmpJitterAlgorithmZERO) {
// for the first time, last_pkt_correct_time is zero. // for the first time, last_pkt_correct_time is zero.
// while when timestamp overflow, the timestamp become smaller, reset the last_pkt_correct_time. // while when timestamp overflow, the timestamp become smaller, reset the last_pkt_correct_time.
if (last_pkt_correct_time <= 0 || last_pkt_correct_time > msg->header.timestamp) { if (last_pkt_correct_time <= 0 || last_pkt_correct_time > msg->timestamp) {
last_pkt_correct_time = msg->header.timestamp; last_pkt_correct_time = msg->timestamp;
} }
msg->header.timestamp -= last_pkt_correct_time; msg->timestamp -= last_pkt_correct_time;
return ret; return ret;
} }
@ -99,8 +99,8 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, SrsRtmpJi
// full jitter algorithm, do jitter correct. // full jitter algorithm, do jitter correct.
// set to 0 for metadata. // set to 0 for metadata.
if (!msg->header.is_audio() && !msg->header.is_video()) { if (!msg->is_av()) {
msg->header.timestamp = 0; msg->timestamp = 0;
return ret; return ret;
} }
@ -117,15 +117,15 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, SrsRtmpJi
* 3. last_pkt_correct_time: simply add the positive delta, * 3. last_pkt_correct_time: simply add the positive delta,
* and enforce the time monotonically. * and enforce the time monotonically.
*/ */
int64_t time = msg->header.timestamp; int64_t time = msg->timestamp;
int64_t delta = time - last_pkt_time; int64_t delta = time - last_pkt_time;
// if jitter detected, reset the delta. // if jitter detected, reset the delta.
if (delta < 0 || delta > CONST_MAX_JITTER_MS) { if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
// calc the right diff by audio sample rate // calc the right diff by audio sample rate
if (msg->header.is_audio() && sample_rate > 0) { if (msg->is_audio() && sample_rate > 0) {
delta = (int64_t)(delta * 1000.0 / sample_rate); delta = (int64_t)(delta * 1000.0 / sample_rate);
} else if (msg->header.is_video() && frame_rate > 0) { } else if (msg->is_video() && frame_rate > 0) {
delta = (int64_t)(delta * 1.0 / frame_rate); delta = (int64_t)(delta * 1.0 / frame_rate);
} else { } else {
delta = DEFAULT_FRAME_TIME_MS; delta = DEFAULT_FRAME_TIME_MS;
@ -145,7 +145,7 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, SrsRtmpJi
last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta); last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
msg->header.timestamp = last_pkt_correct_time; msg->timestamp = last_pkt_correct_time;
last_pkt_time = time; last_pkt_time = time;
return ret; return ret;
@ -186,12 +186,12 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (msg->header.is_audio() || msg->header.is_video()) { if (msg->is_av()) {
if (av_start_time == -1) { if (av_start_time == -1) {
av_start_time = msg->header.timestamp; av_start_time = msg->timestamp;
} }
av_end_time = msg->header.timestamp; av_end_time = msg->timestamp;
} }
msgs.push_back(msg); msgs.push_back(msg);
@ -221,7 +221,7 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in
} }
SrsSharedPtrMessage* last = omsgs[count - 1]; SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = last->header.timestamp; av_start_time = last->timestamp;
if (count >= nb_msgs) { if (count >= nb_msgs) {
// the pmsgs is big enough and clear msgs at most time. // the pmsgs is big enough and clear msgs at most time.
@ -248,13 +248,13 @@ void SrsMessageQueue::shrink()
for (int i = 1; i < (int)msgs.size(); i++) { for (int i = 1; i < (int)msgs.size(); i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs[i];
if (msg->header.is_video()) { if (msg->is_video()) {
if (SrsFlvCodec::video_is_keyframe(msg->payload, msg->size)) { if (SrsFlvCodec::video_is_keyframe(msg->payload, msg->size)) {
// the max frame index to remove. // the max frame index to remove.
iframe_index = i; iframe_index = i;
// set the start time, we will remove until this frame. // set the start time, we will remove until this frame.
av_start_time = msg->header.timestamp; av_start_time = msg->timestamp;
break; break;
} }
@ -471,7 +471,7 @@ int SrsGopCache::cache(SrsSharedPtrMessage* __msg)
} }
// got video, update the video count if acceptable // got video, update the video count if acceptable
if (msg->header.is_video()) { if (msg->is_video()) {
cached_video_count++; cached_video_count++;
audio_after_last_video_count = 0; audio_after_last_video_count = 0;
} }
@ -483,7 +483,7 @@ int SrsGopCache::cache(SrsSharedPtrMessage* __msg)
} }
// ok, gop cache enabled, and got an audio. // ok, gop cache enabled, and got an audio.
if (msg->header.is_audio()) { if (msg->is_audio()) {
audio_after_last_video_count++; audio_after_last_video_count++;
} }
@ -495,7 +495,7 @@ int SrsGopCache::cache(SrsSharedPtrMessage* __msg)
} }
// clear gop cache when got key frame // clear gop cache when got key frame
if (msg->header.is_video() && SrsFlvCodec::video_is_keyframe(msg->payload, msg->size)) { if (msg->is_video() && SrsFlvCodec::video_is_keyframe(msg->payload, msg->size)) {
srs_info("clear gop cache when got keyframe. vcount=%d, count=%d", srs_info("clear gop cache when got keyframe. vcount=%d, count=%d",
cached_video_count, (int)gop_cache.size()); cached_video_count, (int)gop_cache.size());
@ -556,7 +556,7 @@ int64_t SrsGopCache::start_time()
SrsSharedPtrMessage* msg = gop_cache[0]; SrsSharedPtrMessage* msg = gop_cache[0];
srs_assert(msg); srs_assert(msg);
return msg->header.timestamp; return msg->timestamp;
} }
bool SrsGopCache::pure_audio() bool SrsGopCache::pure_audio()
@ -1239,7 +1239,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
srs_trace("%dB audio sh, " srs_trace("%dB audio sh, "
"codec(%d, profile=%d, %dchannels, %dkbps, %dHZ), " "codec(%d, profile=%d, %dchannels, %dkbps, %dHZ), "
"flv(%dbits, %dchannels, %dHZ)", "flv(%dbits, %dchannels, %dHZ)",
msg.header.payload_length, codec.audio_codec_id, msg.size, codec.audio_codec_id,
codec.aac_profile, codec.aac_channels, codec.aac_profile, codec.aac_channels,
codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate], codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type], flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
@ -1257,10 +1257,10 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
// if atc, update the sequence header to abs time. // if atc, update the sequence header to abs time.
if (atc) { if (atc) {
if (cache_sh_audio) { if (cache_sh_audio) {
cache_sh_audio->header.timestamp = msg.header.timestamp; cache_sh_audio->timestamp = msg.timestamp;
} }
if (cache_metadata) { if (cache_metadata) {
cache_metadata->header.timestamp = msg.header.timestamp; cache_metadata->timestamp = msg.timestamp;
} }
} }
@ -1352,7 +1352,7 @@ int SrsSource::on_video(SrsCommonMessage* __video)
srs_trace("%dB video sh, " srs_trace("%dB video sh, "
"codec(%d, profile=%d, level=%d, %dx%d, %dkbps, %dfps, %ds)", "codec(%d, profile=%d, level=%d, %dx%d, %dkbps, %dfps, %ds)",
msg.header.payload_length, codec.video_codec_id, msg.size, codec.video_codec_id,
codec.avc_profile, codec.avc_level, codec.width, codec.height, codec.avc_profile, codec.avc_level, codec.width, codec.height,
codec.video_data_rate / 1000, codec.frame_rate, codec.duration); codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
return ret; return ret;
@ -1368,10 +1368,10 @@ int SrsSource::on_video(SrsCommonMessage* __video)
// if atc, update the sequence header to abs time. // if atc, update the sequence header to abs time.
if (atc) { if (atc) {
if (cache_sh_video) { if (cache_sh_video) {
cache_sh_video->header.timestamp = msg.header.timestamp; cache_sh_video->timestamp = msg.timestamp;
} }
if (cache_metadata) { if (cache_metadata) {
cache_metadata->header.timestamp = msg.header.timestamp; cache_metadata->timestamp = msg.timestamp;
} }
} }
@ -1593,13 +1593,13 @@ void SrsSource::on_unpublish()
// if atc, update the sequence header to gop cache time. // if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) { if (atc && !gop_cache->empty()) {
if (cache_metadata) { if (cache_metadata) {
cache_metadata->header.timestamp = gop_cache->start_time(); cache_metadata->timestamp = gop_cache->start_time();
} }
if (cache_sh_video) { if (cache_sh_video) {
cache_sh_video->header.timestamp = gop_cache->start_time(); cache_sh_video->timestamp = gop_cache->start_time();
} }
if (cache_sh_audio) { if (cache_sh_audio) {
cache_sh_audio->header.timestamp = gop_cache->start_time(); cache_sh_audio->timestamp = gop_cache->start_time();
} }
} }

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR 2 #define VERSION_MAJOR 2
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 63 #define VERSION_REVISION 64
// server info. // server info.
#define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server" #define RTMP_SIG_SRS_ROLE "origin/edge server"

View file

@ -96,7 +96,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* that is, 1+4=5bytes. * that is, 1+4=5bytes.
*/ */
// always use fmt0 as cache. // always use fmt0 as cache.
//#define SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE 5 #define SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE 5
/** /**
* for performance issue, * for performance issue,

View file

@ -405,21 +405,25 @@ SrsSharedPtrMessage::__SrsSharedPtr::~__SrsSharedPtr()
} }
#ifdef SRS_PERF_MW_MSG_IOVS_CACHE #ifdef SRS_PERF_MW_MSG_IOVS_CACHE
int SrsSharedPtrMessage::__SrsSharedPtr::mic_evaluate( int SrsSharedPtrMessage::__SrsSharedPtr::mic_evaluate(int chunk_size)
SrsMessageHeader* mh, int chunk_size {
) {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// use the chunk size, shuold not be changed. // use the chunk size, shuold not be changed.
this->chunk_size = chunk_size; this->chunk_size = chunk_size;
// ignore size // c0 header
srs_chunk_header(mic_c0, mh, true); int nbh = srs_chunk_header_c0(
mic_c3 = 0xC0 | (mh->perfer_cid & 0x3F); header.perfer_cid, 0, header.payload_length,
header.message_type, 0,
mic_c0, sizeof(mic_c0));
srs_assert(nbh > 0);;
// c3 header
mic_c3 = 0xC0 | (header.perfer_cid & 0x3F);
// calc number of iovs // calc number of iovs
nb_chunks = mh->payload_length / chunk_size; nb_chunks = header.payload_length / chunk_size;
if (mh->payload_length % chunk_size) { if (header.payload_length % chunk_size) {
nb_chunks++; nb_chunks++;
} }
nb_iovs = 1/*cid*/ + 1/*size*//*type*/+ 1/*chunk*/; nb_iovs = 1/*cid*/ + 1/*size*//*type*/+ 1/*chunk*/;
@ -529,12 +533,14 @@ int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int si
return ret; return ret;
} }
header = *pheader;
header.payload_length = size;
ptr = new __SrsSharedPtr(); ptr = new __SrsSharedPtr();
// direct attach the data. // direct attach the data.
ptr->header.message_type = pheader->message_type;
ptr->header.payload_length = size;
ptr->header.perfer_cid = pheader->perfer_cid;
this->timestamp = pheader->timestamp;
this->stream_id = pheader->stream_id;
ptr->payload = payload; ptr->payload = payload;
ptr->size = size; ptr->size = size;
@ -551,17 +557,68 @@ int SrsSharedPtrMessage::count()
return ptr->shared_count; return ptr->shared_count;
} }
bool SrsSharedPtrMessage::check(int stream_id)
{
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (ptr->header.perfer_cid < 2) {
srs_info("change the chunk_id=%d to default=%d",
ptr->header.perfer_cid, RTMP_CID_ProtocolControl);
ptr->header.perfer_cid = RTMP_CID_ProtocolControl;
}
// we assume that the stream_id in a group must be the same.
if (this->stream_id == stream_id) {
return true;
}
this->stream_id = stream_id;
return false;
}
bool SrsSharedPtrMessage::is_av()
{
return ptr->header.message_type == RTMP_MSG_AudioMessage
|| ptr->header.message_type == RTMP_MSG_VideoMessage;
}
bool SrsSharedPtrMessage::is_audio()
{
return ptr->header.message_type == RTMP_MSG_AudioMessage;
}
bool SrsSharedPtrMessage::is_video()
{
return ptr->header.message_type == RTMP_MSG_VideoMessage;
}
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
int SrsSharedPtrMessage::chunk_header(char* cache, int nb_cache, bool c0)
{
if (c0) {
return srs_chunk_header_c0(
ptr->header.perfer_cid, timestamp, ptr->header.payload_length,
ptr->header.message_type, stream_id,
cache, nb_cache);
} else {
return srs_chunk_header_c3(
ptr->header.perfer_cid, timestamp,
cache, nb_cache);
}
}
#endif
SrsSharedPtrMessage* SrsSharedPtrMessage::copy() SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
{ {
srs_assert(ptr); srs_assert(ptr);
SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();
copy->header = header;
copy->ptr = ptr; copy->ptr = ptr;
ptr->shared_count++; ptr->shared_count++;
copy->timestamp = timestamp;
copy->stream_id = stream_id;
copy->payload = ptr->payload; copy->payload = ptr->payload;
copy->size = ptr->size; copy->size = ptr->size;
@ -583,7 +640,7 @@ int SrsSharedPtrMessage::mic_evaluate(int chunk_size)
// calc the shared ptr iovs at the first time. // calc the shared ptr iovs at the first time.
if (ptr->chunk_size <= 0) { if (ptr->chunk_size <= 0) {
if ((ret = ptr->mic_evaluate(&header, chunk_size)) != ERROR_SUCCESS) { if ((ret = ptr->mic_evaluate(chunk_size)) != ERROR_SUCCESS) {
srs_warn("mic evaluate source iovs failed. ret=%d", ret); srs_warn("mic evaluate source iovs failed. ret=%d", ret);
return ret; return ret;
} }
@ -610,7 +667,7 @@ int SrsSharedPtrMessage::mic_iovs_dump(iovec* iovs, int max_nb_iovs)
} }
// timestamp for c0/c3 // timestamp for c0/c3
u_int32_t timestamp = (u_int32_t)header.timestamp; u_int32_t timestamp = (u_int32_t)this->timestamp;
mic_etime_present = timestamp >= RTMP_EXTENDED_TIMESTAMP; mic_etime_present = timestamp >= RTMP_EXTENDED_TIMESTAMP;
// chunk message header, 11 bytes // chunk message header, 11 bytes
@ -629,7 +686,7 @@ int SrsSharedPtrMessage::mic_iovs_dump(iovec* iovs, int max_nb_iovs)
// stream_id, 4bytes, little-endian // stream_id, 4bytes, little-endian
p = mic_c0_sid; p = mic_c0_sid;
pp = (char*)&header.stream_id; pp = (char*)&stream_id;
*p++ = pp[0]; *p++ = pp[0];
*p++ = pp[1]; *p++ = pp[1];
*p++ = pp[2]; *p++ = pp[2];
@ -964,14 +1021,6 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
srs_info("ignore empty message."); srs_info("ignore empty message.");
continue; continue;
} }
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (msg->header.perfer_cid < 2) {
srs_info("change the chunk_id=%d to default=%d",
msg->header.perfer_cid, RTMP_CID_ProtocolControl);
msg->header.perfer_cid = RTMP_CID_ProtocolControl;
}
// p set to current write position, // p set to current write position,
// it's ok when payload is NULL and size is 0. // it's ok when payload is NULL and size is 0.
@ -981,7 +1030,8 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
// always write the header event payload is empty. // always write the header event payload is empty.
while (p < pend) { while (p < pend) {
// always has header // always has header
int nbh = srs_chunk_header(c0c3_cache, &msg->header, p == msg->payload); int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
srs_assert(nbh > 0); srs_assert(nbh > 0);
// header iov // header iov
@ -1066,8 +1116,8 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
for (int i = msg_sent; i < nb_msgs; i++) { for (int i = msg_sent; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs[i];
// evaluate // evaluate the first
if ((ret = msg->mic_evaluate(out_chunk_size)) != ERROR_SUCCESS) { if (i == 0 && (ret = msg->mic_evaluate(out_chunk_size)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -1185,7 +1235,18 @@ int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size)
char* end = p + size; char* end = p + size;
char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
while (p < end) { while (p < end) {
int nbh = srs_chunk_header(c0c3, mh, p == payload); int nbh = 0;
if (p == payload) {
nbh = srs_chunk_header_c0(
mh->perfer_cid, mh->timestamp, mh->payload_length,
mh->message_type, mh->stream_id,
c0c3, sizeof(c0c3));
} else {
nbh = srs_chunk_header_c3(
mh->perfer_cid, mh->timestamp,
c0c3, sizeof(c0c3));
}
srs_assert(nbh > 0);;
iovec iovs[2]; iovec iovs[2];
iovs[0].iov_base = c0c3; iovs[0].iov_base = c0c3;
@ -1388,11 +1449,12 @@ int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs,
// update the stream id in header. // update the stream id in header.
for (int i = 0; i < nb_msgs; i++) { for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs[i];
// we assume that the stream_id in a group must be the same.
if (msg->header.stream_id == stream_id) { // check perfer cid and stream,
// when one msg stream id is ok, ignore left.
if (msg->check(stream_id)) {
break; break;
} }
msg->header.stream_id = stream_id;
} }
// donot use the auto free to free the msg, // donot use the auto free to free the msg,

View file

@ -181,6 +181,32 @@ public:
virtual ~SrsCommonMessage(); virtual ~SrsCommonMessage();
}; };
/**
* the message header for shared ptr message.
* only the message for all msgs are same.
*/
struct SrsSharedMessageHeader
{
/**
* 3bytes.
* Three-byte field that represents the size of the payload in bytes.
* It is set in big-endian format.
*/
int32_t payload_length;
/**
* 1byte.
* One byte field to represent the message type. A range of type IDs
* (1-7) are reserved for protocol control messages.
*/
int8_t message_type;
/**
* get the perfered cid(chunk stream id) which sendout over.
* set at decoding, and canbe used for directly send message,
* for example, dispatch to all connections.
*/
int perfer_cid;
};
/** /**
* shared ptr message. * shared ptr message.
* for audio/video/data message that need less memory copy. * for audio/video/data message that need less memory copy.
@ -194,7 +220,22 @@ class SrsSharedPtrMessage
{ {
// 4.1. Message Header // 4.1. Message Header
public: public:
SrsMessageHeader header; // the header can shared, only set the timestamp and stream id.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
//SrsSharedMessageHeader header;
/**
* Four-byte field that contains a timestamp of the message.
* The 4 bytes are packed in the big-endian order.
* @remark, used as calc timestamp when decode and encode time.
* @remark, we use 64bits for large time for jitter detect and hls.
*/
int64_t timestamp;
/**
* 4bytes.
* Four-byte field that identifies the stream of the message. These
* bytes are set in big-endian format.
*/
int32_t stream_id;
// 4.2. Message Payload // 4.2. Message Payload
public: public:
/** /**
@ -214,6 +255,9 @@ private:
class __SrsSharedPtr class __SrsSharedPtr
{ {
public: public:
// shared message header.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
SrsSharedMessageHeader header;
// actual shared payload. // actual shared payload.
char* payload; char* payload;
// size of payload. // size of payload.
@ -269,7 +313,7 @@ private:
* for iovs msg cache, calc the iovs. * for iovs msg cache, calc the iovs.
* @param chunk_size use the specified chunk size to evaluate the iovs. * @param chunk_size use the specified chunk size to evaluate the iovs.
*/ */
virtual int mic_evaluate(SrsMessageHeader* mh, int chunk_size); virtual int mic_evaluate(int chunk_size);
#endif #endif
}; };
__SrsSharedPtr* ptr; __SrsSharedPtr* ptr;
@ -312,6 +356,23 @@ public:
* @remark, assert object is created. * @remark, assert object is created.
*/ */
virtual int count(); virtual int count();
/**
* check perfer cid and stream id.
* @return whether stream id already set.
*/
virtual bool check(int stream_id);
public:
virtual bool is_av();
virtual bool is_audio();
virtual bool is_video();
public:
#ifndef SRS_PERF_MW_MSG_IOVS_CACHE
/**
* generate the chunk header to cache.
* @return the size of header.
*/
virtual int chunk_header(char* cache, int nb_cache, bool c0);
#endif
public: public:
/** /**
* copy current shared ptr message, use ref-count. * copy current shared ptr message, use ref-count.

View file

@ -204,7 +204,11 @@ bool srs_aac_startswith_adts(SrsStream* stream)
return true; return true;
} }
int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0) int srs_chunk_header_c0(
int perfer_cid, u_int32_t timestamp, int32_t payload_length,
int8_t message_type, int32_t stream_id,
char* cache, int nb_cache
)
{ {
// to directly set the field. // to directly set the field.
char* pp = NULL; char* pp = NULL;
@ -212,48 +216,94 @@ int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0)
// generate the header. // generate the header.
char* p = cache; char* p = cache;
// timestamp for c0/c3 // no header.
u_int32_t timestamp = (u_int32_t)mh->timestamp; if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
return 0;
if (c0) {
// write new chunk stream header, fmt is 0
*p++ = 0x00 | (mh->perfer_cid & 0x3F);
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
} else {
*p++ = 0xFF;
*p++ = 0xFF;
*p++ = 0xFF;
}
// message_length, 3bytes, big-endian
pp = (char*)&mh->payload_length;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
// message_type, 1bytes
*p++ = mh->message_type;
// stream_id, 4bytes, little-endian
pp = (char*)&mh->stream_id;
*p++ = pp[0];
*p++ = pp[1];
*p++ = pp[2];
*p++ = pp[3];
} else {
// write no message header chunk stream, fmt is 3
// @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
// SRS will rollback to 1B chunk header.
*p++ = 0xC0 | (mh->perfer_cid & 0x3F);
} }
// write new chunk stream header, fmt is 0
*p++ = 0x00 | (perfer_cid & 0x3F);
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
} else {
*p++ = 0xFF;
*p++ = 0xFF;
*p++ = 0xFF;
}
// message_length, 3bytes, big-endian
pp = (char*)&payload_length;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
// message_type, 1bytes
*p++ = message_type;
// stream_id, 4bytes, little-endian
pp = (char*)&stream_id;
*p++ = pp[0];
*p++ = pp[1];
*p++ = pp[2];
*p++ = pp[3];
// for c0
// chunk extended timestamp header, 0 or 4 bytes, big-endian
//
// for c3:
// chunk extended timestamp header, 0 or 4 bytes, big-endian
// 6.1.3. Extended Timestamp
// This field is transmitted only when the normal time stamp in the
// chunk message header is set to 0x00ffffff. If normal time stamp is
// set to any value less than 0x00ffffff, this field MUST NOT be
// present. This field MUST NOT be present if the timestamp field is not
// present. Type 3 chunks MUST NOT have this field.
// adobe changed for Type3 chunk:
// FMLE always sendout the extended-timestamp,
// must send the extended-timestamp to FMS,
// must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
// TODO: FIXME: extract to outer.
if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp;
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
// always has header
return p - cache;
}
int srs_chunk_header_c3(
int perfer_cid, u_int32_t timestamp,
char* cache, int nb_cache
)
{
// to directly set the field.
char* pp = NULL;
// generate the header.
char* p = cache;
// no header.
if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE) {
return 0;
}
// write no message header chunk stream, fmt is 3
// @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
// SRS will rollback to 1B chunk header.
*p++ = 0xC0 | (perfer_cid & 0x3F);
// for c0 // for c0
// chunk extended timestamp header, 0 or 4 bytes, big-endian // chunk extended timestamp header, 0 or 4 bytes, big-endian
// //

View file

@ -105,12 +105,27 @@ extern bool srs_avc_startswith_annexb(SrsStream* stream, int* pnb_start_code = N
extern bool srs_aac_startswith_adts(SrsStream* stream); extern bool srs_aac_startswith_adts(SrsStream* stream);
/** /**
* generate the chunk header for msg. * generate the c0 chunk header for msg.
* @param mh, the header of msg to send. * @param cache, the cache to write header.
* @param c0, whether the first chunk, the c0 chunk. * @param nb_cache, the size of cache.
* @return the size of header. * @return the size of header. 0 if cache not enough.
*/ */
extern int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0); extern int srs_chunk_header_c0(
int perfer_cid, u_int32_t timestamp, int32_t payload_length,
int8_t message_type, int32_t stream_id,
char* cache, int nb_cache
);
/**
* generate the c3 chunk header for msg.
* @param cache, the cache to write header.
* @param nb_cache, the size of cache.
* @return the size of header. 0 if cache not enough.
*/
extern int srs_chunk_header_c3(
int perfer_cid, u_int32_t timestamp,
char* cache, int nb_cache
);
#endif #endif