1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

refine protocol, use int64_t timestamp for ts and jitter.

This commit is contained in:
winlin 2013-12-15 19:59:32 +08:00
parent ce4928cef6
commit 27255a3e7a
6 changed files with 41 additions and 51 deletions

View file

@ -1272,14 +1272,13 @@ int SrsHls::on_audio(SrsSharedPtrMessage* audio)
return ret;
}
int64_t corrected_time = 0;
if ((ret = jitter->correct(audio, 0, 0, &corrected_time)) != ERROR_SUCCESS) {
if ((ret = jitter->correct(audio, 0, 0)) != ERROR_SUCCESS) {
srs_error("rtmp jitter correct audio failed. ret=%d", ret);
return ret;
}
// the pts calc from rtmp/flv header.
int64_t pts = corrected_time * 90;
int64_t pts = audio->header.timestamp * 90;
if ((ret = ts_cache->write_audio(codec, muxer, pts, sample)) != ERROR_SUCCESS) {
srs_error("ts cache write audio failed. ret=%d", ret);
@ -1315,13 +1314,12 @@ int SrsHls::on_video(SrsSharedPtrMessage* video)
return ret;
}
int64_t corrected_time = 0;
if ((ret = jitter->correct(video, 0, 0, &corrected_time)) != ERROR_SUCCESS) {
if ((ret = jitter->correct(video, 0, 0)) != ERROR_SUCCESS) {
srs_error("rtmp jitter correct video failed. ret=%d", ret);
return ret;
}
int64_t dts = corrected_time * 90;
int64_t dts = video->header.timestamp * 90;
if ((ret = ts_cache->write_video(codec, muxer, dts, sample)) != ERROR_SUCCESS) {
srs_error("ts cache write video failed. ret=%d", ret);
return ret;

View file

@ -354,7 +354,7 @@ int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).",
srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
srs_freep(msg);
@ -405,12 +405,13 @@ int SrsProtocol::send_message(ISrsMessage* msg)
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
if (msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP) {
u_int32_t timestamp = (u_int32_t)msg->header.timestamp;
if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
*pheader++ = 0xFF;
*pheader++ = 0xFF;
*pheader++ = 0xFF;
} else {
pp = (char*)&msg->header.timestamp;
pp = (char*)&timestamp;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
@ -433,8 +434,8 @@ int SrsProtocol::send_message(ISrsMessage* msg)
*pheader++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&msg->header.timestamp;
if(timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
@ -461,8 +462,9 @@ int SrsProtocol::send_message(ISrsMessage* msg)
// must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&msg->header.timestamp;
u_int32_t timestamp = (u_int32_t)msg->header.timestamp;
if(timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
@ -702,7 +704,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
@ -716,7 +718,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
return ret;
}
srs_verbose("read message header success. "
"fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
"fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
@ -738,14 +740,14 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
// not got an entire RTMP message, try next chunk.
if (!msg) {
srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
*pmsg = msg;
srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
@ -952,16 +954,16 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
pp[1] = *p++;
pp[2] = *p++;
pp[3] = *p++;
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d, sid=%d",
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d, sid=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
chunk->header.message_type, chunk->header.stream_id);
} else {
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d",
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
chunk->header.message_type);
}
} else {
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d",
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64"",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
}
} else {
@ -986,7 +988,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// ffmpeg/librtmp may donot send this filed, need to detect the value.
// @see also: http://blog.csdn.net/win_lin/article/details/13363699
int32_t timestamp = 0x00;
u_int32_t timestamp = 0x00;
char* pp = (char*)&timestamp;
pp[3] = *p++;
pp[2] = *p++;
@ -995,14 +997,14 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// compare to the chunk timestamp, which is set by chunk message header
// type 0,1 or 2.
int32_t chunk_timestamp = chunk->header.timestamp;
u_int32_t chunk_timestamp = chunk->header.timestamp;
if (chunk_timestamp > RTMP_EXTENDED_TIMESTAMP && chunk_timestamp != timestamp) {
mh_size -= 4;
srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size);
} else {
chunk->header.timestamp = timestamp;
}
srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp);
srs_verbose("header read ext_time completed. time=%"PRId64"", chunk->header.timestamp);
}
// valid message
@ -1032,7 +1034,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
buffer->erase(bh_size + mh_size);
srs_trace("get an empty RTMP "
"message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type,
"message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
*pmsg = chunk->msg;
@ -1073,13 +1075,13 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
if (chunk->header.payload_length == chunk->msg->size) {
*pmsg = chunk->msg;
chunk->msg = NULL;
srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)",
srs_verbose("get entire RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d",
srs_verbose("get partial RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d), partial size=%d",
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id,
chunk->msg->size);

View file

@ -204,8 +204,9 @@ struct SrsMessageHeader
* Four-byte field that contains a timestamp of the message.
* The 4 bytes are packed in the big-endian order.
* @remark, used as calc timestamp when decode and encode time.
* @remark, we use 64bits for large time for jitter detect and hls.
*/
u_int32_t timestamp;
int64_t timestamp;
SrsMessageHeader();
virtual ~SrsMessageHeader();
@ -1126,7 +1127,7 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T**
T* pkt = dynamic_cast<T*>(msg->get_packet());
if (!pkt) {
delete msg;
srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).",
srs_trace("drop message(type=%d, size=%d, time=%"PRId64", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
continue;

View file

@ -38,7 +38,7 @@ using namespace std;
#include <srs_core_rtmp.hpp>
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 10
#define DEFAULT_FRAME_TIME_MS 40
SrsRtmpJitter::SrsRtmpJitter()
{
@ -49,19 +49,13 @@ SrsRtmpJitter::~SrsRtmpJitter()
{
}
// TODO: FIXME: remove the 64bits time, change the timestamp in heaer to 64bits.
int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time)
int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv)
{
int ret = ERROR_SUCCESS;
// set to 0 for metadata.
if (!msg->header.is_video() && !msg->header.is_audio()) {
if (corrected_time) {
*corrected_time = 0;
}
msg->header.timestamp = 0;
return ret;
}
@ -78,16 +72,16 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t*
* 3. last_pkt_correct_time: simply add the positive delta,
* and enforce the time monotonically.
*/
u_int32_t time = msg->header.timestamp;
int32_t delta = time - last_pkt_time;
int64_t time = msg->header.timestamp;
int64_t delta = time - last_pkt_time;
// if jitter detected, reset the delta.
if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
// calc the right diff by audio sample rate
if (msg->header.is_audio() && sample_rate > 0) {
delta = (int32_t)(delta * 1000.0 / sample_rate);
delta = (int64_t)(delta * 1000.0 / sample_rate);
} else if (msg->header.is_video() && frame_rate > 0) {
delta = (int32_t)(delta * 1.0 / frame_rate);
delta = (int64_t)(delta * 1.0 / frame_rate);
} else {
delta = DEFAULT_FRAME_TIME_MS;
}
@ -97,20 +91,16 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t*
delta = DEFAULT_FRAME_TIME_MS;
}
srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d",
srs_info("jitter detected, last_pts=%"PRId64", pts=%"PRId64", diff=%"PRId64", last_time=%"PRId64", time=%"PRId64", diff=%"PRId64"",
last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);
} else {
srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d",
srs_verbose("timestamp no jitter. time=%"PRId64", last_pkt=%"PRId64", correct_to=%"PRId64"",
time, last_pkt_time, last_pkt_correct_time + delta);
}
last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
if (corrected_time) {
*corrected_time = last_pkt_correct_time;
}
msg->header.timestamp = last_pkt_correct_time;
last_pkt_time = time;
return ret;

View file

@ -56,18 +56,16 @@ class SrsEncoder;
class SrsRtmpJitter
{
private:
u_int32_t last_pkt_time;
u_int32_t last_pkt_correct_time;
int64_t last_pkt_time;
int64_t last_pkt_correct_time;
public:
SrsRtmpJitter();
virtual ~SrsRtmpJitter();
public:
/**
* detect the time jitter and correct it.
* @param corrected_time output the 64bits time.
* ignore if NULL.
*/
virtual int correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time = NULL);
virtual int correct(SrsSharedPtrMessage* msg, int tba, int tbv);
/**
* get current client time, the last packet time.
*/