diff --git a/README.md b/README.md index 73c9a94e3..4e10b14e5 100755 --- a/README.md +++ b/README.md @@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw * nginx v1.5.0: 139524 lines
### History +* v0.9, 2013-12-15, refine protocol, use int64_t timestamp for ts and jitter. * v0.9, 2013-12-15, support set the live queue length(in seconds), drop when full. * v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header. * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder. diff --git a/trunk/src/core/srs_core_hls.cpp b/trunk/src/core/srs_core_hls.cpp index 76b6df87c..6bc4dfd23 100644 --- a/trunk/src/core/srs_core_hls.cpp +++ b/trunk/src/core/srs_core_hls.cpp @@ -1272,14 +1272,13 @@ int SrsHls::on_audio(SrsSharedPtrMessage* audio) return ret; } - int64_t corrected_time = 0; - if ((ret = jitter->correct(audio, 0, 0, &corrected_time)) != ERROR_SUCCESS) { + if ((ret = jitter->correct(audio, 0, 0)) != ERROR_SUCCESS) { srs_error("rtmp jitter correct audio failed. ret=%d", ret); return ret; } // the pts calc from rtmp/flv header. - int64_t pts = corrected_time * 90; + int64_t pts = audio->header.timestamp * 90; if ((ret = ts_cache->write_audio(codec, muxer, pts, sample)) != ERROR_SUCCESS) { srs_error("ts cache write audio failed. ret=%d", ret); @@ -1315,13 +1314,12 @@ int SrsHls::on_video(SrsSharedPtrMessage* video) return ret; } - int64_t corrected_time = 0; - if ((ret = jitter->correct(video, 0, 0, &corrected_time)) != ERROR_SUCCESS) { + if ((ret = jitter->correct(video, 0, 0)) != ERROR_SUCCESS) { srs_error("rtmp jitter correct video failed. ret=%d", ret); return ret; } - int64_t dts = corrected_time * 90; + int64_t dts = video->header.timestamp * 90; if ((ret = ts_cache->write_video(codec, muxer, dts, sample)) != ERROR_SUCCESS) { srs_error("ts cache write video failed. ret=%d", ret); return ret; diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 1b50d18e2..a1e95547b 100644 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -354,7 +354,7 @@ int SrsProtocol::recv_message(SrsCommonMessage** pmsg) } if (msg->size <= 0 || msg->header.payload_length <= 0) { - srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).", + srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).", msg->header.message_type, msg->header.payload_length, msg->header.timestamp, msg->header.stream_id); srs_freep(msg); @@ -405,12 +405,13 @@ int SrsProtocol::send_message(ISrsMessage* msg) // chunk message header, 11 bytes // timestamp, 3bytes, big-endian - if (msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP) { + u_int32_t timestamp = (u_int32_t)msg->header.timestamp; + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { *pheader++ = 0xFF; *pheader++ = 0xFF; *pheader++ = 0xFF; } else { - pp = (char*)&msg->header.timestamp; + pp = (char*)×tamp; *pheader++ = pp[2]; *pheader++ = pp[1]; *pheader++ = pp[0]; @@ -433,8 +434,8 @@ int SrsProtocol::send_message(ISrsMessage* msg) *pheader++ = pp[3]; // chunk extended timestamp header, 0 or 4 bytes, big-endian - if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){ - pp = (char*)&msg->header.timestamp; + if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ + pp = (char*)×tamp; *pheader++ = pp[3]; *pheader++ = pp[2]; *pheader++ = pp[1]; @@ -461,8 +462,9 @@ int SrsProtocol::send_message(ISrsMessage* msg) // must send the extended-timestamp to flash-player. // @see: ngx_rtmp_prepare_message // @see: http://blog.csdn.net/win_lin/article/details/13363699 - if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){ - pp = (char*)&msg->header.timestamp; + u_int32_t timestamp = (u_int32_t)msg->header.timestamp; + if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ + pp = (char*)×tamp; *pheader++ = pp[3]; *pheader++ = pp[2]; *pheader++ = pp[1]; @@ -702,7 +704,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid); } else { chunk = chunk_streams[cid]; - srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); } @@ -716,7 +718,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) return ret; } srs_verbose("read message header success. " - "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); @@ -738,14 +740,14 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) // not got an entire RTMP message, try next chunk. if (!msg) { - srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); return ret; } *pmsg = msg; - srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)", + srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)", payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); @@ -952,16 +954,16 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz pp[1] = *p++; pp[2] = *p++; pp[3] = *p++; - srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d, sid=%d", + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d, sid=%d", fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length, chunk->header.message_type, chunk->header.stream_id); } else { - srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d", + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d", fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length, chunk->header.message_type); } } else { - srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d", + srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64"", fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp); } } else { @@ -986,7 +988,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // ffmpeg/librtmp may donot send this filed, need to detect the value. // @see also: http://blog.csdn.net/win_lin/article/details/13363699 - int32_t timestamp = 0x00; + u_int32_t timestamp = 0x00; char* pp = (char*)×tamp; pp[3] = *p++; pp[2] = *p++; @@ -995,14 +997,14 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // compare to the chunk timestamp, which is set by chunk message header // type 0,1 or 2. - int32_t chunk_timestamp = chunk->header.timestamp; + u_int32_t chunk_timestamp = chunk->header.timestamp; if (chunk_timestamp > RTMP_EXTENDED_TIMESTAMP && chunk_timestamp != timestamp) { mh_size -= 4; srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size); } else { chunk->header.timestamp = timestamp; } - srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp); + srs_verbose("header read ext_time completed. time=%"PRId64"", chunk->header.timestamp); } // valid message @@ -1032,7 +1034,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh buffer->erase(bh_size + mh_size); srs_trace("get an empty RTMP " - "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type, + "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); *pmsg = chunk->msg; @@ -1073,13 +1075,13 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh if (chunk->header.payload_length == chunk->msg->size) { *pmsg = chunk->msg; chunk->msg = NULL; - srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)", + srs_verbose("get entire RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); return ret; } - srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d", + srs_verbose("get partial RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d), partial size=%d", chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id, chunk->msg->size); diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index f9c5db5fe..733b82b31 100644 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -204,8 +204,9 @@ struct SrsMessageHeader * Four-byte field that contains a timestamp of the message. * The 4 bytes are packed in the big-endian order. * @remark, used as calc timestamp when decode and encode time. + * @remark, we use 64bits for large time for jitter detect and hls. */ - u_int32_t timestamp; + int64_t timestamp; SrsMessageHeader(); virtual ~SrsMessageHeader(); @@ -1126,7 +1127,7 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** T* pkt = dynamic_cast(msg->get_packet()); if (!pkt) { delete msg; - srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).", + srs_trace("drop message(type=%d, size=%d, time=%"PRId64", sid=%d).", msg->header.message_type, msg->header.payload_length, msg->header.timestamp, msg->header.stream_id); continue; diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 3a0f0ce58..bd79e7921 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -38,7 +38,7 @@ using namespace std; #include #define CONST_MAX_JITTER_MS 500 -#define DEFAULT_FRAME_TIME_MS 10 +#define DEFAULT_FRAME_TIME_MS 40 SrsRtmpJitter::SrsRtmpJitter() { @@ -49,19 +49,13 @@ SrsRtmpJitter::~SrsRtmpJitter() { } -// TODO: FIXME: remove the 64bits time, change the timestamp in heaer to 64bits. -int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time) +int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv) { int ret = ERROR_SUCCESS; // set to 0 for metadata. if (!msg->header.is_video() && !msg->header.is_audio()) { - if (corrected_time) { - *corrected_time = 0; - } - msg->header.timestamp = 0; - return ret; } @@ -78,16 +72,16 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* * 3. last_pkt_correct_time: simply add the positive delta, * and enforce the time monotonically. */ - u_int32_t time = msg->header.timestamp; - int32_t delta = time - last_pkt_time; + int64_t time = msg->header.timestamp; + int64_t delta = time - last_pkt_time; // if jitter detected, reset the delta. if (delta < 0 || delta > CONST_MAX_JITTER_MS) { // calc the right diff by audio sample rate if (msg->header.is_audio() && sample_rate > 0) { - delta = (int32_t)(delta * 1000.0 / sample_rate); + delta = (int64_t)(delta * 1000.0 / sample_rate); } else if (msg->header.is_video() && frame_rate > 0) { - delta = (int32_t)(delta * 1.0 / frame_rate); + delta = (int64_t)(delta * 1.0 / frame_rate); } else { delta = DEFAULT_FRAME_TIME_MS; } @@ -97,20 +91,16 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* delta = DEFAULT_FRAME_TIME_MS; } - srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d", + srs_info("jitter detected, last_pts=%"PRId64", pts=%"PRId64", diff=%"PRId64", last_time=%"PRId64", time=%"PRId64", diff=%"PRId64"", last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta); } else { - srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d", + srs_verbose("timestamp no jitter. time=%"PRId64", last_pkt=%"PRId64", correct_to=%"PRId64"", time, last_pkt_time, last_pkt_correct_time + delta); } last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta); - if (corrected_time) { - *corrected_time = last_pkt_correct_time; - } msg->header.timestamp = last_pkt_correct_time; - last_pkt_time = time; return ret; diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index 298b1e65f..2c008f703 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -56,18 +56,16 @@ class SrsEncoder; class SrsRtmpJitter { private: - u_int32_t last_pkt_time; - u_int32_t last_pkt_correct_time; + int64_t last_pkt_time; + int64_t last_pkt_correct_time; public: SrsRtmpJitter(); virtual ~SrsRtmpJitter(); public: /** * detect the time jitter and correct it. - * @param corrected_time output the 64bits time. - * ignore if NULL. */ - virtual int correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time = NULL); + virtual int correct(SrsSharedPtrMessage* msg, int tba, int tbv); /** * get current client time, the last packet time. */