diff --git a/README.md b/README.md index 216ee5c66..547a9a291 100755 --- a/README.md +++ b/README.md @@ -159,6 +159,7 @@ For previous versions, please read: ## V4 changes +* v4.0, 2020-08-18, RTC: DTLS support ARQ, covered with utest. 4.0.39 * v4.0, 2020-08-06, RTC: Refine error check. 4.0.37 * v4.0, 2020-07-25, RTC: Support multiple address for client. 4.0.36 * v4.0, 2020-07-11, Refine log context with random string. 4.0.35 diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 359bf35a8..d3a6111c6 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -178,7 +178,7 @@ Performance: @see https://blog.csdn.net/win_lin/article/details/5 --nasm=on|off Whether build FFMPEG for RTC with nasm support. --srtp-nasm=on|off Whether build SRTP with ASM(openssl-asm) support, requires RTC and openssl-1.0.*. - --sendmmsg=on|off Whether enable UDP sendmmsg support. @see http://man7.org/linux/man-pages/man2/sendmmsg.2.html + --sendmmsg=on|off Whether enable UDP sendmmsg support. Default: off. @see http://man7.org/linux/man-pages/man2/sendmmsg.2.html Toolchain options: @see https://github.com/ossrs/srs/issues/1547#issuecomment-576078411 --static Whether add '-static' to link options. @@ -204,9 +204,7 @@ Experts: --use-shared-srt Use link shared libraries for SRT which uses MPL license. --build-tag= Set the build object directory suffix. --clean=on|off Whether do 'make clean' when configure. - --detect-sendmmsg=on|off Whether detect the sendmmsg API. - --has-sendmmsg=on|off Whether OS supports sendmmsg API. - --simulator=on|off Whether enable RTC network simulator. + --simulator=on|off Whether enable RTC network simulator. Default: off Workflow: 1. Apply "Presets". if not specified, use default preset. diff --git a/trunk/src/app/srs_app_bandwidth.cpp b/trunk/src/app/srs_app_bandwidth.cpp index f89cabe03..6dc156ddf 100644 --- a/trunk/src/app/srs_app_bandwidth.cpp +++ b/trunk/src/app/srs_app_bandwidth.cpp @@ -151,7 +151,7 @@ srs_error_t SrsBandwidth::bandwidth_check(SrsRtmpServer* rtmp, ISrsProtocolStati // reject the connection in the interval window. if (last_check_time > 0 && time_now - last_check_time < interval) { _rtmp->response_connect_reject(_req, "bandcheck rejected"); - return srs_error_new(ERROR_SYSTEM_BANDWIDTH_DENIED, "reject, last_check=%" PRId64 ", now=%" PRId64 ", interval=%d", last_check_time, time_now, interval); + return srs_error_new(ERROR_SYSTEM_BANDWIDTH_DENIED, "reject, last_check=%" PRId64 ", now=%" PRId64 ", interval=%" PRId64 "", last_check_time, time_now, interval); } // accept and do bandwidth check. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 2f44f120e..625a3690c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -130,7 +130,7 @@ namespace _srs_internal // read total content from file. ssize_t nread = 0; if ((err = reader.read(start, filesize, &nread)) != srs_success) { - return srs_error_wrap(err, "read %d only %d bytes", filesize, nread); + return srs_error_wrap(err, "read %d only %d bytes", filesize, (int)nread); } return err; @@ -620,6 +620,7 @@ srs_error_t srs_config_dumps_engine(SrsConfDirective* dir, SrsJsonObject* engine SrsConfDirective::SrsConfDirective() { + conf_line = 0; } SrsConfDirective::~SrsConfDirective() @@ -1140,6 +1141,7 @@ SrsConfig::SrsConfig() show_help = false; show_version = false; test_conf = false; + show_signature = false; root = new SrsConfDirective(); root->conf_line = 0; @@ -3661,7 +3663,7 @@ srs_error_t SrsConfig::check_normal_config() for (int i = 0; i < (int)listens.size(); i++) { string port = listens[i]; if (port.empty() || ::atoi(port.c_str()) <= 0) { - return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "listen.port=%d is invalid", port.c_str()); + return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "listen.port=%s is invalid", port.c_str()); } } } diff --git a/trunk/src/app/srs_app_dash.cpp b/trunk/src/app/srs_app_dash.cpp index 40e9a9c53..bf67014be 100644 --- a/trunk/src/app/srs_app_dash.cpp +++ b/trunk/src/app/srs_app_dash.cpp @@ -307,6 +307,7 @@ SrsDashController::SrsDashController() vfragments = new SrsFragmentWindow(); afragments = new SrsFragmentWindow(); audio_dts = video_dts = 0; + fragment = 0; } SrsDashController::~SrsDashController() diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 2665210a1..2dc7b57dc 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -48,6 +48,7 @@ SrsDvrSegmenter::SrsDvrSegmenter() req = NULL; jitter = NULL; plan = NULL; + wait_keyframe = true; fragment = new SrsFragment(); fs = new SrsFileWriter(); @@ -585,6 +586,7 @@ string SrsDvrAsyncCallOnDvr::to_string() SrsDvrPlan::SrsDvrPlan() { req = NULL; + hub = NULL; dvr_enabled = false; segment = NULL; diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index d59393daf..98bb6152f 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -65,6 +65,7 @@ SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r) { redirect = r; sdk = NULL; + selected_port = 0; } SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream() @@ -440,6 +441,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() edge = NULL; req = NULL; send_error_code = ERROR_SUCCESS; + source = NULL; sdk = NULL; lb = new SrsLbRoundRobin(); diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index e5cdfbfc6..81c7d7a79 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -200,6 +200,7 @@ SrsHlsMuxer::SrsHlsMuxer() accept_floor_ts = 0; hls_ts_floor = false; max_td = 0; + writer = NULL; _sequence_no = 0; current = NULL; hls_keys = false; @@ -214,6 +215,7 @@ SrsHlsMuxer::SrsHlsMuxer() SrsHlsMuxer::~SrsHlsMuxer() { + srs_freep(segments); srs_freep(current); srs_freep(req); srs_freep(async); @@ -757,6 +759,10 @@ srs_error_t SrsHlsMuxer::_refresh_m3u8(string m3u8_file) // #EXT-X-MEDIA-SEQUENCE:4294967295\n SrsHlsSegment* first = dynamic_cast(segments->first()); + if (first == NULL) { + return srs_error_new(ERROR_HLS_WRITE_FAILED, "segments cast"); + } + ss << "#EXT-X-MEDIA-SEQUENCE:" << first->sequence_no << SRS_CONSTS_LF; // #EXT-X-TARGETDURATION:4294967295\n diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 5fd1d8cc3..61dd72348 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -532,7 +532,7 @@ srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::strin } if ((res_code->to_integer()) != ERROR_SUCCESS) { - return srs_error_new(ERROR_RESPONSE_CODE, "http: response object code %d %s", res_code->to_integer(), res.c_str()); + return srs_error_new(ERROR_RESPONSE_CODE, "http: response object code %" PRId64 " %s", res_code->to_integer(), res.c_str()); } return err; diff --git a/trunk/src/app/srs_app_http_static.cpp b/trunk/src/app/srs_app_http_static.cpp index 04724bdab..a20fea72f 100644 --- a/trunk/src/app/srs_app_http_static.cpp +++ b/trunk/src/app/srs_app_http_static.cpp @@ -134,7 +134,7 @@ srs_error_t SrsVodStream::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMe // send data if ((err = copy(w, fs, r, (int)left)) != srs_success) { - return srs_error_wrap(err, "read flv=%s size=%d", fullpath.c_str(), left); + return srs_error_wrap(err, "read flv=%s size=%d", fullpath.c_str(), (int)left); } return err; @@ -184,7 +184,7 @@ srs_error_t SrsVodStream::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMe // send data if ((err = copy(w, fs, r, (int)left)) != srs_success) { - return srs_error_wrap(err, "read mp4=%s size=%d", fullpath.c_str(), left); + return srs_error_wrap(err, "read mp4=%s size=%d", fullpath.c_str(), (int)left); } return err; diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index b255d6be7..a8c7130e2 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -38,6 +38,7 @@ using namespace std; SrsIngesterFFMPEG::SrsIngesterFFMPEG() { ffmpeg = NULL; + starttime = 0; } SrsIngesterFFMPEG::~SrsIngesterFFMPEG() diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index cd847fa36..f2525057b 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -125,6 +125,7 @@ void SrsUdpListener::set_socket_buffer() int r0_sndbuf = 0; if (true) { socklen_t opt_len = sizeof(default_sndbuf); + // TODO: FIXME: check err getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&default_sndbuf, &opt_len); if ((r0_sndbuf = setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, sizeof(actual_sndbuf))) < 0) { @@ -132,6 +133,7 @@ void SrsUdpListener::set_socket_buffer() } opt_len = sizeof(actual_sndbuf); + // TODO: FIXME: check err getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, &opt_len); } @@ -142,6 +144,7 @@ void SrsUdpListener::set_socket_buffer() int r0_rcvbuf = 0; if (true) { socklen_t opt_len = sizeof(default_rcvbuf); + // TODO: FIXME: check err getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&default_rcvbuf, &opt_len); if ((r0_rcvbuf = setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, sizeof(actual_rcvbuf))) < 0) { @@ -149,6 +152,7 @@ void SrsUdpListener::set_socket_buffer() } opt_len = sizeof(actual_rcvbuf); + // TODO: FIXME: check err getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, &opt_len); } @@ -288,6 +292,7 @@ SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) lfd = fd; fromlen = 0; + peer_port = 0; } SrsUdpMuxSocket::~SrsUdpMuxSocket() diff --git a/trunk/src/app/srs_app_process.cpp b/trunk/src/app/srs_app_process.cpp index a36fc2b19..0ef31197a 100644 --- a/trunk/src/app/srs_app_process.cpp +++ b/trunk/src/app/srs_app_process.cpp @@ -158,12 +158,14 @@ srs_error_t srs_redirect_output(string from_file, int to_fd) if ((fd = ::open(from_file.c_str(), flags, mode)) < 0) { return srs_error_new(ERROR_FORK_OPEN_LOG, "open process %d %s failed", to_fd, from_file.c_str()); } - - if (dup2(fd, to_fd) < 0) { - return srs_error_new(ERROR_FORK_DUP2_LOG, "dup2 process %d failed", to_fd); - } - + + int r0 = dup2(fd, to_fd); ::close(fd); + + if (r0 < 0) { + return srs_error_new(ERROR_FORK_DUP2_LOG, "dup2 fd=%d, to=%d, file=%s failed, r0=%d", + fd, to_fd, from_file.c_str(), r0); + } return err; } diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 78ee964f0..38101ba89 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -340,7 +340,7 @@ srs_error_t SrsPublishRecvThread::error_code() return srs_error_copy(recv_error); } -void SrsPublishRecvThread::set_cid(std::string v) +void SrsPublishRecvThread::set_cid(SrsContextId v) { ncid = v; } diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 1d46f58d3..98546f091 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -180,7 +180,7 @@ public: virtual int64_t nb_msgs(); virtual uint64_t nb_video_frames(); virtual srs_error_t error_code(); - virtual void set_cid(std::string v); + virtual void set_cid(SrsContextId v); virtual SrsContextId get_cid(); public: virtual srs_error_t start(); diff --git a/trunk/src/app/srs_app_rtc_codec.cpp b/trunk/src/app/srs_app_rtc_codec.cpp index 7c0db4454..85e8efa44 100644 --- a/trunk/src/app/srs_app_rtc_codec.cpp +++ b/trunk/src/app/srs_app_rtc_codec.cpp @@ -170,6 +170,7 @@ srs_error_t SrsAudioEncoder::initialize() case 24000: opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_SUPERWIDEBAND)); + break; case 16000: opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_WIDEBAND)); @@ -343,6 +344,10 @@ SrsAudioRecode::SrsAudioRecode(int channels, int samplerate) { size_ = 0; data_ = new char[kPcmBufMax]; + + dec_ = NULL; + enc_ = NULL; + resample_ = NULL; } SrsAudioRecode::~SrsAudioRecode() @@ -395,7 +400,7 @@ srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, int decode_len = kPacketBufMax; static char decode_buffer[kPacketBufMax]; if ((err = dec_->decode(pkt, decode_buffer, decode_len)) != srs_success) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "decode error"); + return srs_error_wrap(err, "decode error"); } if (!resample_) { @@ -419,7 +424,7 @@ srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, int resample_len = kFrameBufMax; static char resample_buffer[kFrameBufMax]; if ((err = resample_->resample(&pcm, resample_buffer, resample_len)) != srs_success) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "resample error"); + return srs_error_wrap(err, "resample error"); } n = 0; @@ -446,7 +451,7 @@ srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, pcm.size = size_; static char encode_buffer[kPacketBufMax]; if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "encode error"); + return srs_error_wrap(err, "encode error"); } memcpy(buf[n], encode_buffer, encode_len); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index f7b72048d..4c93c9adb 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -313,16 +313,17 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid) _srs_config->subscribe(this); timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS); + + nack_epp = new SrsErrorPithyPrint(); } SrsRtcPlayStream::~SrsRtcPlayStream() { _srs_config->unsubscribe(this); - srs_freep(req_); - srs_freep(trd); srs_freep(timer_); + srs_freep(req_); if (true) { std::map::iterator it; @@ -337,6 +338,8 @@ SrsRtcPlayStream::~SrsRtcPlayStream() srs_freep(it->second); } } + + srs_freep(nack_epp); } srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map sub_relations) @@ -724,7 +727,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp) vector resend_pkts; vector sns = rtcp->get_lost_sns(); - for(int i = 0; i < sns.size(); ++i) { + for(int i = 0; i < (int)sns.size(); ++i) { uint16_t seq = sns.at(i); nack_fetch(resend_pkts, rtcp->get_media_ssrc(), seq); } @@ -732,8 +735,12 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp) for (int i = 0; i < (int)resend_pkts.size(); ++i) { SrsRtpPacket2* pkt = resend_pkts[i]; info.nn_bytes += pkt->nb_bytes(); - srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, %d bytes", pkt->header.get_sequence(), - pkt->header.get_ssrc(), pkt->header.get_timestamp(), pkt->nb_bytes()); + + uint32_t nn = 0; + if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) { + srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(), + pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes()); + } } // By default, we send packets by sendmmsg. @@ -797,10 +804,11 @@ uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc) return 0; } -SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session) +SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, SrsContextId parent_cid) { timer_ = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); + parent_cid_ = parent_cid; is_started = false; session_ = session; request_keyframe_ = false; @@ -869,6 +877,12 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti session_->stat_->nn_publishers++; + // Setup the publish stream in source to enable PLI as such. + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + source->set_publish_stream(this); + return err; } @@ -888,16 +902,10 @@ srs_error_t SrsRtcPublishStream::start() return srs_error_wrap(err, "start timer"); } - if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - if ((err = source->on_publish()) != srs_success) { return srs_error_wrap(err, "on publish"); } - source->set_publish_stream(this); - if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) { return srs_error_wrap(err, "on start publish"); @@ -1003,7 +1011,8 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) // For NACK simulator, drop packet. if (nn_simulate_nack_drop) { - SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); h.decode(&b); + SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); + err = h.decode(&b); srs_freep(err); // Ignore any error for simluate drop. simulate_drop_packet(&h, nb_data); return err; } @@ -1043,7 +1052,8 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) char* unprotected_buf = new char[kRtpPacketSize]; if ((err = session_->transport_->unprotect_rtp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) { // We try to decode the RTP header for more detail error informations. - SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); h.decode(&b); + SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); + srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding. err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(), h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos()); @@ -1449,19 +1459,7 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id) SrsRtcConnection::~SrsRtcConnection() { srs_freep(timer_); - srs_freep(transport_); - srs_freep(req); - srs_freep(stat_); - srs_freep(pp_address_change); - - // Note that we should never delete the sendonly_skt, - // it's just point to the object in peer_addresses_. - map::iterator it; - for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) { - SrsUdpMuxSocket* addr = it->second; - srs_freep(addr); - } - + // Cleanup publishers. for(map::iterator it = publishers_.begin(); it != publishers_.end(); ++it) { SrsRtcPublishStream* publisher = it->second; @@ -1477,6 +1475,19 @@ SrsRtcConnection::~SrsRtcConnection() } players_.clear(); players_ssrc_map_.clear(); + + // Note that we should never delete the sendonly_skt, + // it's just point to the object in peer_addresses_. + map::iterator it; + for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) { + SrsUdpMuxSocket* addr = it->second; + srs_freep(addr); + } + + srs_freep(transport_); + srs_freep(req); + srs_freep(stat_); + srs_freep(pp_address_change); } SrsSdp* SrsRtcConnection::get_local_sdp() @@ -1548,7 +1559,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot return srs_error_wrap(err, "publish negotiate"); } - if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc)) != srs_success) { + if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc, remote_sdp.is_unified())) != srs_success) { return srs_error_wrap(err, "generate local sdp"); } @@ -1611,7 +1622,7 @@ srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_s ++it; } - if ((err = generate_play_local_sdp(req, local_sdp, stream_desc)) != srs_success) { + if ((err = generate_play_local_sdp(req, local_sdp, stream_desc, remote_sdp.is_unified())) != srs_success) { return srs_error_wrap(err, "generate local sdp"); } @@ -1622,7 +1633,7 @@ srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_s return err; } -srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp) +srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, bool unified_plan, SrsSdp& local_sdp) { srs_error_t err = srs_success; @@ -1658,7 +1669,7 @@ srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp) ++it; } - if ((err = generate_play_local_sdp(req, local_sdp, stream_desc)) != srs_success) { + if ((err = generate_play_local_sdp(req, local_sdp, stream_desc, unified_plan)) != srs_success) { return srs_error_wrap(err, "generate local sdp"); } @@ -2091,7 +2102,7 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ss int nb_protected_buf = stream.pos(); // FIXME: Merge nack rtcp into one packets. - if (transport_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + if (transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf) == srs_success) { // TODO: FIXME: Check error. sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); } @@ -2569,7 +2580,7 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRequest* req, cons return err; } -srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc) +srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan) { srs_error_t err = srs_success; @@ -2665,8 +2676,10 @@ srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); } - // only need media desc info, not ssrc info; - break; + if(!unified_plan) { + // For PlanB, only need media desc info, not ssrc info; + break; + } } return err; @@ -2789,9 +2802,9 @@ srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRequest* req, SrsRtcS //negotiate video media std::vector req_video_tracks = req_stream_desc->video_track_descs_; src_track_descs = source->get_track_desc("video", "h264"); - for(int i = 0; i < req_video_tracks.size(); ++i) { + for(int i = 0; i < (int)req_video_tracks.size(); ++i) { SrsRtcTrackDescription* req_video = req_video_tracks.at(i); - for(int j = 0; j < src_track_descs.size(); ++j) { + for(int j = 0; j < (int)src_track_descs.size(); ++j) { SrsRtcTrackDescription* src_video = src_track_descs.at(j); if(req_video->id_ == src_video->id_) { // FIXME: use source sdp or subscribe sdp? native subscribe may have no sdp @@ -2859,7 +2872,42 @@ srs_error_t SrsRtcConnection::fetch_source_capability(SrsRequest* req, std::map< return err; } -srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc) +void video_track_generate_play_offer(SrsRtcTrackDescription* track, SrsSdp& local_sdp) +{ + local_sdp.media_descs_.push_back(SrsMediaDesc("video")); + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + + local_media_desc.port_ = 9; + local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; + local_media_desc.rtcp_mux_ = true; + local_media_desc.rtcp_rsize_ = true; + + local_media_desc.extmaps_ = track->extmaps_; + + local_media_desc.mid_ = track->mid_; + local_sdp.groups_.push_back(local_media_desc.mid_); + + if (track->direction_ == "recvonly") { + local_media_desc.recvonly_ = true; + } else if (track->direction_ == "sendonly") { + local_media_desc.sendonly_ = true; + } else if (track->direction_ == "sendrecv") { + local_media_desc.sendrecv_ = true; + } else if (track->direction_ == "inactive_") { + local_media_desc.inactive_ = true; + } + + SrsVideoPayload* payload = (SrsVideoPayload*)track->media_; + + local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); + + if (track->red_) { + SrsRedPayload* red_payload = (SrsRedPayload*)track->red_; + local_media_desc.payload_types_.push_back(red_payload->generate_media_payload_type()); + } +} + +srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan) { srs_error_t err = srs_success; @@ -2948,39 +2996,14 @@ srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& l for (int i = 0; i < (int)stream_desc->video_track_descs_.size(); ++i) { SrsRtcTrackDescription* track = stream_desc->video_track_descs_[i]; - // for plan b, we only add one m= - if (i == 0) { - local_sdp.media_descs_.push_back(SrsMediaDesc("video")); - SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); - - local_media_desc.port_ = 9; - local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; - local_media_desc.rtcp_mux_ = true; - local_media_desc.rtcp_rsize_ = true; - - local_media_desc.extmaps_ = track->extmaps_; - - local_media_desc.mid_ = track->mid_; - local_sdp.groups_.push_back(local_media_desc.mid_); - - if (track->direction_ == "recvonly") { - local_media_desc.recvonly_ = true; - } else if (track->direction_ == "sendonly") { - local_media_desc.sendonly_ = true; - } else if (track->direction_ == "sendrecv") { - local_media_desc.sendrecv_ = true; - } else if (track->direction_ == "inactive_") { - local_media_desc.inactive_ = true; - } - - SrsVideoPayload* payload = (SrsVideoPayload*)track->media_; - - local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); - - if (track->red_) { - SrsRedPayload* red_payload = (SrsRedPayload*)track->red_; - local_media_desc.payload_types_.push_back(red_payload->generate_media_payload_type()); + if (!unified_plan) { + // for plan b, we only add one m= for video track. + if (i == 0) { + video_track_generate_play_offer(track, local_sdp); } + } else { + // unified plan SDP, generate a m= for each video track. + video_track_generate_play_offer(track, local_sdp); } SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); @@ -3089,7 +3112,7 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDesc return err; } - SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this); + SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this, _srs_context->get_id()); if ((err = publisher->initialize(req, stream_desc)) != srs_success) { srs_freep(publisher); return srs_error_wrap(err, "rtc publisher init"); @@ -3122,7 +3145,7 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDesc } } - for(int i = 0; i < stream_desc->video_track_descs_.size(); ++i) { + for(int i = 0; i < (int)stream_desc->video_track_descs_.size(); ++i) { SrsRtcTrackDescription* track_desc = stream_desc->video_track_descs_.at(i); if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->ssrc_)) { return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s", diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index c2a037a4b..0e75baf93 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -221,6 +221,8 @@ private: // key: publish_ssrc, value: send track to process rtp/rtcp std::map audio_tracks_; std::map video_tracks_; + // The pithy print for special stage. + SrsErrorPithyPrint* nack_epp; private: // For merged-write messages. int mw_msgs; @@ -271,6 +273,7 @@ private: class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublishStream { private: + SrsContextId parent_cid_; SrsHourGlass* timer_; uint64_t nn_audio_frames; private: @@ -296,7 +299,7 @@ private: SrsRtpExtensionTypes extension_types_; bool is_started; public: - SrsRtcPublishStream(SrsRtcConnection* session); + SrsRtcPublishStream(SrsRtcConnection* session, SrsContextId parent_cid); virtual ~SrsRtcPublishStream(); public: srs_error_t initialize(SrsRequest* req, SrsRtcStreamDescription* stream_desc); @@ -439,7 +442,7 @@ public: srs_error_t add_publisher(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t add_player(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp); // server send offer sdp to client, local sdp derivate from source stream desc. - srs_error_t add_player2(SrsRequest* request, SrsSdp& local_sdp); + srs_error_t add_player2(SrsRequest* request, bool unified_plan, SrsSdp& local_sdp); public: // Before initialize, user must set the local SDP, which is used to inititlize DTLS. srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username); @@ -483,13 +486,13 @@ private: srs_error_t on_binding_request(SrsStunPacket* r); // publish media capabilitiy negotiate srs_error_t negotiate_publish_capability(SrsRequest* req, const SrsSdp& remote_sdp, SrsRtcStreamDescription* stream_desc); - srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc); + srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan); // play media capabilitiy negotiate //TODO: Use StreamDescription to negotiate and remove first negotiate_play_capability function srs_error_t negotiate_play_capability(SrsRequest* req, const SrsSdp& remote_sdp, std::map& sub_relations); srs_error_t negotiate_play_capability(SrsRequest* req, SrsRtcStreamDescription* req_stream_desc, std::map& sub_relations); srs_error_t fetch_source_capability(SrsRequest* req, std::map& sub_relations); - srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc); + srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan); srs_error_t create_player(SrsRequest* request, std::map sub_relations); srs_error_t create_publisher(SrsRequest* request, SrsRtcStreamDescription* stream_desc); }; diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index a54d8ddc6..c0c7a486c 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -312,10 +312,12 @@ ISrsDtlsCallback::~ISrsDtlsCallback() { } -SrsDtls::SrsDtls(ISrsDtlsCallback* callback) +SrsDtlsImpl::SrsDtlsImpl(ISrsDtlsCallback* callback) { dtls_ctx = NULL; dtls = NULL; + bio_in = NULL; + bio_out = NULL; callback_ = callback; handshake_done_for_us = false; @@ -323,17 +325,11 @@ SrsDtls::SrsDtls(ISrsDtlsCallback* callback) last_outgoing_packet_cache = new uint8_t[kRtpPacketSize]; nn_last_outgoing_packet = 0; - role_ = SrsDtlsRoleServer; version_ = SrsDtlsVersionAuto; - - trd = NULL; - state_ = SrsDtlsStateInit; } -SrsDtls::~SrsDtls() +SrsDtlsImpl::~SrsDtlsImpl() { - srs_freep(trd); - if (dtls_ctx) { SSL_CTX_free(dtls_ctx); dtls_ctx = NULL; @@ -348,15 +344,10 @@ SrsDtls::~SrsDtls() srs_freepa(last_outgoing_packet_cache); } -srs_error_t SrsDtls::initialize(std::string role, std::string version) +srs_error_t SrsDtlsImpl::initialize(std::string version) { srs_error_t err = srs_success; - role_ = SrsDtlsRoleServer; - if (role == "active") { - role_ = SrsDtlsRoleClient; - } - if (version == "dtls1.0") { version_ = SrsDtlsVersion1_0; } else if (version == "dtls1.2") { @@ -371,15 +362,6 @@ srs_error_t SrsDtls::initialize(std::string role, std::string version) return srs_error_new(ERROR_OpenSslCreateSSL, "SSL_new dtls"); } - if (role_ == SrsDtlsRoleClient) { - // Dtls setup active, as client role. - SSL_set_connect_state(dtls); - SSL_set_max_send_fragment(dtls, kRtpPacketSize); - } else { - // Dtls setup passive, as server role. - SSL_set_accept_state(dtls); - } - if ((bio_in = BIO_new(BIO_s_mem())) == NULL) { return srs_error_new(ERROR_OpenSslBIONew, "BIO_new in"); } @@ -394,29 +376,10 @@ srs_error_t SrsDtls::initialize(std::string role, std::string version) return err; } -srs_error_t SrsDtls::start_active_handshake() +srs_error_t SrsDtlsImpl::on_dtls(char* data, int nb_data) { srs_error_t err = srs_success; - if (role_ == SrsDtlsRoleClient) { - return do_handshake(); - } - - return err; -} - -srs_error_t SrsDtls::on_dtls(char* data, int nb_data) -{ - srs_error_t err = srs_success; - - // When got packet, stop the ARQ if server in the first ARQ state SrsDtlsStateServerHello. - // @note But for ARQ state, we should never stop the ARQ, for example, we are in the second ARQ sate - // SrsDtlsStateServerDone, but we got previous late wrong packet ServeHello, which is not the expect - // packet SessionNewTicket, we should never stop the ARQ thread. - if (role_ == SrsDtlsRoleClient && state_ == SrsDtlsStateServerHello) { - stop_arq(); - } - if ((err = do_on_dtls(data, nb_data)) != srs_success) { return srs_error_wrap(err, "on_dtls size=%u, data=[%s]", nb_data, srs_string_dumps_hex(data, nb_data, 32).c_str()); @@ -425,7 +388,7 @@ srs_error_t SrsDtls::on_dtls(char* data, int nb_data) return err; } -srs_error_t SrsDtls::do_on_dtls(char* data, int nb_data) +srs_error_t SrsDtlsImpl::do_on_dtls(char* data, int nb_data) { srs_error_t err = srs_success; @@ -468,7 +431,7 @@ srs_error_t SrsDtls::do_on_dtls(char* data, int nb_data) return err; } -srs_error_t SrsDtls::do_handshake() +srs_error_t SrsDtlsImpl::do_handshake() { srs_error_t err = srs_success; @@ -490,16 +453,9 @@ srs_error_t SrsDtls::do_handshake() uint8_t* data = NULL; int size = BIO_get_mem_data(bio_out, &data); - // If outgoing packet is empty, we use the last cache. - // @remark Only for DTLS server, because DTLS client use ARQ thread to send cached packet. + // Callback when got SSL original data. bool cache = false; - if (role_ != SrsDtlsRoleClient && size <= 0 && nn_last_outgoing_packet) { - size = nn_last_outgoing_packet; - data = last_outgoing_packet_cache; - cache = true; - } - - // Trace the detail of DTLS packet. + on_ssl_out_data(data, size, cache); state_trace((uint8_t*)data, size, false, r0, r1, cache, false); // Update the packet cache. @@ -508,29 +464,9 @@ srs_error_t SrsDtls::do_handshake() nn_last_outgoing_packet = size; } - // Driven ARQ and state for DTLS client. - if (role_ == SrsDtlsRoleClient) { - // If we are sending client hello, change from init to new state. - if (state_ == SrsDtlsStateInit && size > 14 && data[13] == 1) { - state_ = SrsDtlsStateClientHello; - } - // If we are sending certificate, change from SrsDtlsStateServerHello to new state. - if (state_ == SrsDtlsStateServerHello && size > 14 && data[13] == 11) { - state_ = SrsDtlsStateClientCertificate; - } - - // Try to start the ARQ for client. - if ((state_ == SrsDtlsStateClientHello || state_ == SrsDtlsStateClientCertificate)) { - if (state_ == SrsDtlsStateClientHello) { - state_ = SrsDtlsStateServerHello; - } else if (state_ == SrsDtlsStateClientCertificate) { - state_ = SrsDtlsStateServerDone; - } - - if ((err = start_arq()) != srs_success) { - return srs_error_wrap(err, "start arq"); - } - } + // Callback for the final output data, before send-out. + if ((err = on_final_out_data(data, size)) != srs_success) { + return srs_error_wrap(err, "handle"); } if (size > 0 && (err = callback_->write_dtls_data(data, size)) != srs_success) { @@ -539,30 +475,213 @@ srs_error_t SrsDtls::do_handshake() } if (handshake_done_for_us) { - // When handshake done, stop the ARQ. - if (role_ == SrsDtlsRoleClient) { - state_ = SrsDtlsStateClientDone; - stop_arq(); - } - - // Notify connection the DTLS is done. - if (((err = callback_->on_dtls_handshake_done()) != srs_success)) { - return srs_error_wrap(err, "dtls done"); + if (((err = on_handshake_done()) != srs_success)) { + return srs_error_wrap(err, "done"); } } return err; } -srs_error_t SrsDtls::cycle() +void SrsDtlsImpl::state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq) +{ + uint8_t content_type = 0; + if (length >= 1) { + content_type = (uint8_t)data[0]; + } + + uint16_t size = 0; + if (length >= 13) { + size = uint16_t(data[11])<<8 | uint16_t(data[12]); + } + + uint8_t handshake_type = 0; + if (length >= 14) { + handshake_type = (uint8_t)data[13]; + } + + srs_trace("DTLS: %s %s, done=%u, cache=%u, arq=%u, r0=%d, r1=%d, len=%u, cnt=%u, size=%u, hs=%u", + (is_dtls_client()? "Active":"Passive"), (incoming? "RECV":"SEND"), handshake_done_for_us, cache, arq, + r0, r1, length, content_type, size, handshake_type); +} + +const int SRTP_MASTER_KEY_KEY_LEN = 16; +const int SRTP_MASTER_KEY_SALT_LEN = 14; +srs_error_t SrsDtlsImpl::get_srtp_key(std::string& recv_key, std::string& send_key) +{ + srs_error_t err = srs_success; + + unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server + static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; + if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { + return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL export key r0=%lu", ERR_get_error()); + } + + size_t offset = 0; + + std::string client_master_key(reinterpret_cast(material), SRTP_MASTER_KEY_KEY_LEN); + offset += SRTP_MASTER_KEY_KEY_LEN; + std::string server_master_key(reinterpret_cast(material + offset), SRTP_MASTER_KEY_KEY_LEN); + offset += SRTP_MASTER_KEY_KEY_LEN; + std::string client_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); + offset += SRTP_MASTER_KEY_SALT_LEN; + std::string server_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); + + if (is_dtls_client()) { + recv_key = server_master_key + server_master_salt; + send_key = client_master_key + client_master_salt; + } else { + recv_key = client_master_key + client_master_salt; + send_key = server_master_key + server_master_salt; + } + + return err; +} + +SrsDtlsClientImpl::SrsDtlsClientImpl(ISrsDtlsCallback* callback) : SrsDtlsImpl(callback) +{ + trd = NULL; + state_ = SrsDtlsStateInit; + arq_first = 50 * SRS_UTIME_MILLISECONDS; + arq_interval = 100 * SRS_UTIME_MILLISECONDS; +} + +SrsDtlsClientImpl::~SrsDtlsClientImpl() +{ + srs_freep(trd); +} + +srs_error_t SrsDtlsClientImpl::initialize(std::string version) +{ + srs_error_t err = srs_success; + + if ((err = SrsDtlsImpl::initialize(version)) != srs_success) { + return err; + } + + // Dtls setup active, as client role. + SSL_set_connect_state(dtls); + SSL_set_max_send_fragment(dtls, kRtpPacketSize); + + return err; +} + +srs_error_t SrsDtlsClientImpl::start_active_handshake() +{ + return do_handshake(); +} + +srs_error_t SrsDtlsClientImpl::on_dtls(char* data, int nb_data) +{ + srs_error_t err = srs_success; + + // When got packet, stop the ARQ if server in the first ARQ state SrsDtlsStateServerHello. + // @note But for ARQ state, we should never stop the ARQ, for example, we are in the second ARQ sate + // SrsDtlsStateServerDone, but we got previous late wrong packet ServeHello, which is not the expect + // packet SessionNewTicket, we should never stop the ARQ thread. + if (state_ == SrsDtlsStateServerHello) { + stop_arq(); + } + + if ((err = SrsDtlsImpl::on_dtls(data, nb_data)) != srs_success) { + return err; + } + + return err; +} + +void SrsDtlsClientImpl::on_ssl_out_data(uint8_t*& data, int& size, bool& cached) +{ + // DTLS client use ARQ thread to send cached packet. + cached = false; +} + +srs_error_t SrsDtlsClientImpl::on_final_out_data(uint8_t* data, int size) +{ + srs_error_t err = srs_success; + + // Driven ARQ and state for DTLS client. + // If we are sending client hello, change from init to new state. + if (state_ == SrsDtlsStateInit && size > 14 && data[13] == 1) { + state_ = SrsDtlsStateClientHello; + } + // If we are sending certificate, change from SrsDtlsStateServerHello to new state. + if (state_ == SrsDtlsStateServerHello && size > 14 && data[13] == 11) { + state_ = SrsDtlsStateClientCertificate; + } + + // Try to start the ARQ for client. + if ((state_ == SrsDtlsStateClientHello || state_ == SrsDtlsStateClientCertificate)) { + if (state_ == SrsDtlsStateClientHello) { + state_ = SrsDtlsStateServerHello; + } else if (state_ == SrsDtlsStateClientCertificate) { + state_ = SrsDtlsStateServerDone; + } + + if ((err = start_arq()) != srs_success) { + return srs_error_wrap(err, "start arq"); + } + } + + return err; +} + +srs_error_t SrsDtlsClientImpl::on_handshake_done() +{ + srs_error_t err = srs_success; + + // When handshake done, stop the ARQ. + state_ = SrsDtlsStateClientDone; + stop_arq(); + + // Notify connection the DTLS is done. + if (((err = callback_->on_dtls_handshake_done()) != srs_success)) { + return srs_error_wrap(err, "dtls done"); + } + + return err; +} + +bool SrsDtlsClientImpl::is_dtls_client() +{ + return true; +} + +srs_error_t SrsDtlsClientImpl::start_arq() +{ + srs_error_t err = srs_success; + + srs_info("start arq, state=%u", state_); + + // Dispose the previous ARQ thread. + srs_freep(trd); + trd = new SrsSTCoroutine("dtls", this, _srs_context->get_id()); + + // We should start the ARQ thread for DTLS client. + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "arq start"); + } + + return err; +} + +void SrsDtlsClientImpl::stop_arq() +{ + srs_info("stop arq, state=%u", state_); + srs_freep(trd); + srs_info("stop arq, done"); +} + +srs_error_t SrsDtlsClientImpl::cycle() { srs_error_t err = srs_success; // The first ARQ delay. - srs_usleep(50 * SRS_UTIME_MILLISECONDS); + srs_usleep(arq_first); - while (true) { - srs_info("arq cycle, state=%u", state_); + // Limit the max retry for ARQ. + for (int arq_retry_left = 7; arq_retry_left > 0; arq_retry_left--) { + srs_info("arq cycle, state=%u, retry=%d", state_, arq_retry_left); // We ignore any error for ARQ thread. if ((err = trd->pull()) != srs_success) { @@ -595,96 +714,110 @@ srs_error_t SrsDtls::cycle() } // TODO: Use ARQ step timeouts. - srs_usleep(100 * SRS_UTIME_MILLISECONDS); + srs_usleep(arq_interval); } return err; } -void SrsDtls::state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq) +SrsDtlsServerImpl::SrsDtlsServerImpl(ISrsDtlsCallback* callback) : SrsDtlsImpl(callback) { - uint8_t content_type = 0; - if (length >= 1) { - content_type = (uint8_t)data[0]; - } - - uint16_t size = 0; - if (length >= 13) { - size = uint16_t(data[11])<<8 | uint16_t(data[12]); - } - - uint8_t handshake_type = 0; - if (length >= 14) { - handshake_type = (uint8_t)data[13]; - } - - srs_trace("DTLS: %s %s, done=%u, cache=%u, arq=%u, state=%u, r0=%d, r1=%d, len=%u, cnt=%u, size=%u, hs=%u", - (role_ == SrsDtlsRoleClient? "Active":"Passive"), (incoming? "RECV":"SEND"), handshake_done_for_us, cache, arq, - state_, r0, r1, length, content_type, size, handshake_type); } -srs_error_t SrsDtls::start_arq() +SrsDtlsServerImpl::~SrsDtlsServerImpl() +{ +} + +srs_error_t SrsDtlsServerImpl::initialize(std::string version) { srs_error_t err = srs_success; - if (role_ != SrsDtlsRoleClient) { + if ((err = SrsDtlsImpl::initialize(version)) != srs_success) { return err; } - srs_info("start arq, state=%u", state_); - - // Dispose the previous ARQ thread. - srs_freep(trd); - trd = new SrsSTCoroutine("dtls", this, _srs_context->get_id()); - - // We should start the ARQ thread for DTLS client. - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "arq start"); - } + // Dtls setup passive, as server role. + SSL_set_accept_state(dtls); return err; } -void SrsDtls::stop_arq() +srs_error_t SrsDtlsServerImpl::start_active_handshake() { - srs_info("stop arq, state=%u", state_); - srs_freep(trd); - srs_info("stop arq, done"); + return srs_success; } -const int SRTP_MASTER_KEY_KEY_LEN = 16; -const int SRTP_MASTER_KEY_SALT_LEN = 14; -srs_error_t SrsDtls::get_srtp_key(std::string& recv_key, std::string& send_key) +void SrsDtlsServerImpl::on_ssl_out_data(uint8_t*& data, int& size, bool& cached) +{ + // If outgoing packet is empty, we use the last cache. + // @remark Only for DTLS server, because DTLS client use ARQ thread to send cached packet. + if (size <= 0 && nn_last_outgoing_packet) { + size = nn_last_outgoing_packet; + data = last_outgoing_packet_cache; + cached = true; + } +} + +srs_error_t SrsDtlsServerImpl::on_final_out_data(uint8_t* data, int size) +{ + return srs_success; +} + +srs_error_t SrsDtlsServerImpl::on_handshake_done() { srs_error_t err = srs_success; - unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server - static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; - if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { - return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL export key r0=%u", ERR_get_error()); - } - - size_t offset = 0; - - std::string client_master_key(reinterpret_cast(material), SRTP_MASTER_KEY_KEY_LEN); - offset += SRTP_MASTER_KEY_KEY_LEN; - std::string server_master_key(reinterpret_cast(material + offset), SRTP_MASTER_KEY_KEY_LEN); - offset += SRTP_MASTER_KEY_KEY_LEN; - std::string client_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); - offset += SRTP_MASTER_KEY_SALT_LEN; - std::string server_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); - - if (role_ == SrsDtlsRoleClient) { - recv_key = server_master_key + server_master_salt; - send_key = client_master_key + client_master_salt; - } else { - recv_key = client_master_key + client_master_salt; - send_key = server_master_key + server_master_salt; + // Notify connection the DTLS is done. + if (((err = callback_->on_dtls_handshake_done()) != srs_success)) { + return srs_error_wrap(err, "dtls done"); } return err; } +bool SrsDtlsServerImpl::is_dtls_client() +{ + return false; +} + +SrsDtls::SrsDtls(ISrsDtlsCallback* callback) +{ + callback_ = callback; + impl = new SrsDtlsServerImpl(callback); +} + +SrsDtls::~SrsDtls() +{ + srs_freep(impl); +} + +srs_error_t SrsDtls::initialize(std::string role, std::string version) +{ + srs_freep(impl); + if (role == "active") { + impl = new SrsDtlsClientImpl(callback_); + } else { + impl = new SrsDtlsServerImpl(callback_); + } + + return impl->initialize(version); +} + +srs_error_t SrsDtls::start_active_handshake() +{ + return impl->start_active_handshake(); +} + +srs_error_t SrsDtls::on_dtls(char* data, int nb_data) +{ + return impl->on_dtls(data, nb_data); +} + +srs_error_t SrsDtls::get_srtp_key(std::string& recv_key, std::string& send_key) +{ + return impl->get_srtp_key(recv_key, send_key); +} + SrsSRTP::SrsSRTP() { recv_ctx_ = NULL; diff --git a/trunk/src/app/srs_app_rtc_dtls.hpp b/trunk/src/app/srs_app_rtc_dtls.hpp index b15da73c1..019c360ec 100644 --- a/trunk/src/app/srs_app_rtc_dtls.hpp +++ b/trunk/src/app/srs_app_rtc_dtls.hpp @@ -105,31 +105,93 @@ enum SrsDtlsState { SrsDtlsStateClientDone, // Done. }; -class SrsDtls : public ISrsCoroutineHandler +class SrsDtlsImpl { -private: +protected: SSL_CTX* dtls_ctx; SSL* dtls; BIO* bio_in; BIO* bio_out; ISrsDtlsCallback* callback_; -private: + // @remark: dtls_version_ default value is SrsDtlsVersionAuto. + SrsDtlsVersion version_; +protected: // Whether the handhshake is done, for us only. // @remark For us only, means peer maybe not done, we also need to handle the DTLS packet. bool handshake_done_for_us; // DTLS packet cache, only last out-going packet. uint8_t* last_outgoing_packet_cache; int nn_last_outgoing_packet; +public: + SrsDtlsImpl(ISrsDtlsCallback* callback); + virtual ~SrsDtlsImpl(); +public: + virtual srs_error_t initialize(std::string version); + virtual srs_error_t start_active_handshake() = 0; + virtual srs_error_t on_dtls(char* data, int nb_data); +protected: + srs_error_t do_on_dtls(char* data, int nb_data); + srs_error_t do_handshake(); + void state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq); +public: + srs_error_t get_srtp_key(std::string& recv_key, std::string& send_key); +protected: + virtual void on_ssl_out_data(uint8_t*& data, int& size, bool& cached) = 0; + virtual srs_error_t on_final_out_data(uint8_t* data, int size) = 0; + virtual srs_error_t on_handshake_done() = 0; + virtual bool is_dtls_client() = 0; +}; + +class SrsDtlsClientImpl : virtual public SrsDtlsImpl, virtual public ISrsCoroutineHandler +{ +private: // ARQ thread, for role active(DTLS client). // @note If passive(DTLS server), the ARQ is driven by DTLS client. SrsCoroutine* trd; // The DTLS-client state to drive the ARQ thread. SrsDtlsState state_; + // The timeout for ARQ. + srs_utime_t arq_first; + srs_utime_t arq_interval; +public: + SrsDtlsClientImpl(ISrsDtlsCallback* callback); + virtual ~SrsDtlsClientImpl(); +public: + virtual srs_error_t initialize(std::string version); + virtual srs_error_t start_active_handshake(); + virtual srs_error_t on_dtls(char* data, int nb_data); +protected: + virtual void on_ssl_out_data(uint8_t*& data, int& size, bool& cached); + virtual srs_error_t on_final_out_data(uint8_t* data, int size); + virtual srs_error_t on_handshake_done(); + virtual bool is_dtls_client(); private: - // @remark: dtls_role_ default value is DTLS_SERVER. - SrsDtlsRole role_; - // @remark: dtls_version_ default value is SrsDtlsVersionAuto. - SrsDtlsVersion version_; + srs_error_t start_arq(); + void stop_arq(); +public: + virtual srs_error_t cycle(); +}; + +class SrsDtlsServerImpl : public SrsDtlsImpl +{ +public: + SrsDtlsServerImpl(ISrsDtlsCallback* callback); + virtual ~SrsDtlsServerImpl(); +public: + virtual srs_error_t initialize(std::string version); + virtual srs_error_t start_active_handshake(); +protected: + virtual void on_ssl_out_data(uint8_t*& data, int& size, bool& cached); + virtual srs_error_t on_final_out_data(uint8_t* data, int size); + virtual srs_error_t on_handshake_done(); + virtual bool is_dtls_client(); +}; + +class SrsDtls +{ +private: + SrsDtlsImpl* impl; + ISrsDtlsCallback* callback_; public: SrsDtls(ISrsDtlsCallback* callback); virtual ~SrsDtls(); @@ -141,17 +203,6 @@ public: // When got DTLS packet, may handshake packets or application data. // @remark When we are passive(DTLS server), we start handshake when got DTLS packet. srs_error_t on_dtls(char* data, int nb_data); -private: - srs_error_t do_on_dtls(char* data, int nb_data); - srs_error_t do_handshake(); -// interface ISrsCoroutineHandler -public: - virtual srs_error_t cycle(); -private: - void state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq); -private: - srs_error_t start_arq(); - void stop_arq(); public: srs_error_t get_srtp_key(std::string& recv_key, std::string& send_key); }; diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index ff820ab8b..f569e178c 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -189,6 +189,7 @@ SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue rtp_ = rtp; pre_check_time_ = 0; last_remove_packet_time_ = -1; + rtt_ = 0; srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64, max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); diff --git a/trunk/src/app/srs_app_rtc_sdp.cpp b/trunk/src/app/srs_app_rtc_sdp.cpp index 84e56f758..4b428ace1 100644 --- a/trunk/src/app/srs_app_rtc_sdp.cpp +++ b/trunk/src/app/srs_app_rtc_sdp.cpp @@ -254,6 +254,7 @@ srs_error_t SrsSSRCGroup::encode(std::ostringstream& os) SrsMediaPayloadType::SrsMediaPayloadType(int payload_type) { payload_type_ = payload_type; + clock_rate_ = 0; } SrsMediaPayloadType::~SrsMediaPayloadType() @@ -288,7 +289,9 @@ SrsMediaDesc::SrsMediaDesc(const std::string& type) { type_ = type; + port_ = 0; rtcp_mux_ = false; + rtcp_rsize_ = false; sendrecv_ = false; recvonly_ = false; @@ -315,7 +318,7 @@ vector SrsMediaDesc::find_media_with_encoding_name(const st { std::vector payloads; - std::string lower_name, upper_name; + std::string lower_name(encoding_name), upper_name(encoding_name); transform(encoding_name.begin(), encoding_name.end(), lower_name.begin(), ::tolower); transform(encoding_name.begin(), encoding_name.end(), upper_name.begin(), ::toupper); @@ -1062,3 +1065,10 @@ srs_error_t SrsSdp::parse_media_description(const std::string& content) return err; } + +bool SrsSdp::is_unified() const +{ + // TODO: FIXME: Maybe we should consider other situations. + return media_descs_.size() > 2; +} + diff --git a/trunk/src/app/srs_app_rtc_sdp.hpp b/trunk/src/app/srs_app_rtc_sdp.hpp index 443923efc..d4edd6e55 100644 --- a/trunk/src/app/srs_app_rtc_sdp.hpp +++ b/trunk/src/app/srs_app_rtc_sdp.hpp @@ -246,6 +246,8 @@ public: // m-line, media sessions std::vector media_descs_; + + bool is_unified() const; }; #endif diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 393ee709e..1ae4468e8 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -513,7 +513,7 @@ srs_error_t SrsRtcServer::do_create_session( return err; } -srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, SrsRtcConnection** psession) +srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession) { srs_error_t err = srs_success; @@ -525,7 +525,7 @@ srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, co SrsRtcConnection* session = new SrsRtcConnection(this, cid); // first add player for negotiate local sdp media info - if ((err = session->add_player2(req, local_sdp)) != srs_success) { + if ((err = session->add_player2(req, unified_plan, local_sdp)) != srs_success) { srs_freep(session); return srs_error_wrap(err, "add player2"); } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 4a166f659..fbc50859b 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -126,7 +126,7 @@ private: ); public: // We start offering, create_session2 to generate offer, setup_session2 to handle answer. - srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, SrsRtcConnection** psession); + srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession); srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp); // Destroy the session from server. void destroy(SrsRtcConnection* session); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 452083fc6..9bbdbc2c7 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #ifdef SRS_FFMPEG_FIT #include @@ -539,6 +540,8 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) // audio track description if (true) { SrsRtcTrackDescription* audio_track_desc = new SrsRtcTrackDescription(); + SrsAutoFree(SrsRtcTrackDescription, audio_track_desc); + audio_track_desc->type_ = "audio"; audio_track_desc->id_ = "audio-" + srs_random_str(8); @@ -553,6 +556,8 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) // video track description if (true) { SrsRtcTrackDescription* video_track_desc = new SrsRtcTrackDescription(); + SrsAutoFree(SrsRtcTrackDescription, video_track_desc); + video_track_desc->type_ = "video"; video_track_desc->id_ = "video-" + srs_random_str(8); @@ -1117,6 +1122,8 @@ void SrsRtcDummyBridger::on_unpublish() SrsCodecPayload::SrsCodecPayload() { + pt_ = 0; + sample_ = 0; } SrsCodecPayload::SrsCodecPayload(uint8_t pt, std::string encode_name, int sample) @@ -1249,7 +1256,7 @@ srs_error_t SrsVideoPayload::set_h264_param_desc(std::string fmtp) SrsAudioPayload::SrsAudioPayload() { - type_ = "audio"; + channel_ = 0; } SrsAudioPayload::SrsAudioPayload(uint8_t pt, std::string encode_name, int sample, int channel) @@ -1331,6 +1338,7 @@ srs_error_t SrsAudioPayload::set_opus_param_desc(std::string fmtp) SrsRedPayload::SrsRedPayload() { + channel_ = 0; } SrsRedPayload::SrsRedPayload(uint8_t pt, std::string encode_name, int sample, int channel) @@ -1604,6 +1612,8 @@ SrsRtcRecvTrack::SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescripti rtp_queue_ = new SrsRtpRingBuffer(1000); nack_receiver_ = new SrsRtpNackForReceiver(rtp_queue_, 1000 * 2 / 3); } + + last_sender_report_sys_time = 0; } SrsRtcRecvTrack::~SrsRtcRecvTrack() @@ -1804,6 +1814,8 @@ SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescripti } else { rtp_queue_ = new SrsRtpRingBuffer(1000); } + + nack_epp = new SrsErrorPithyPrint(); } SrsRtcSendTrack::~SrsRtcSendTrack() @@ -1811,6 +1823,7 @@ SrsRtcSendTrack::~SrsRtcSendTrack() srs_freep(rtp_queue_); srs_freep(track_desc_); srs_freep(statistic_); + srs_freep(nack_epp); } bool SrsRtcSendTrack::has_ssrc(uint32_t ssrc) @@ -1827,12 +1840,18 @@ SrsRtpPacket2* SrsRtcSendTrack::fetch_rtp_packet(uint16_t seq) } // For NACK, it sequence must match exactly, or it cause SRTP fail. - if (pkt->header.get_sequence() != seq) { - srs_trace("miss match seq=%u, pkt seq=%u", seq, pkt->header.get_sequence()); - return NULL; + // Return packet only when sequence is equal. + if (pkt->header.get_sequence() == seq) { + return pkt; } - return pkt; + // Ignore if sequence not match. + uint32_t nn = 0; + if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) { + srs_trace("RTC NACK miss seq=%u, require_seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", seq, pkt->header.get_sequence(), + pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes()); + } + return NULL; } // TODO: FIXME: Should refine logs, set tracks in a time. diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 09893c6a1..cd292b767 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -55,6 +55,7 @@ class SrsRtpRingBuffer; class SrsRtpNackForReceiver; class SrsJsonObject; class SrsRtcPlayStreamStatistic; +class SrsErrorPithyPrint; class SrsNtp { @@ -308,6 +309,12 @@ class SrsAudioPayload : public SrsCodecPayload int minptime; bool use_inband_fec; bool usedtx; + + SrsOpusParameter() { + minptime = 0; + use_inband_fec = false; + usedtx = false; + } }; public: @@ -524,11 +531,14 @@ protected: // send track description SrsRtcTrackDescription* track_desc_; SrsRtcTrackStatistic* statistic_; - +protected: // The owner connection for this track. SrsRtcConnection* session_; // NACK ARQ ring buffer. SrsRtpRingBuffer* rtp_queue_; +private: + // The pithy print for special stage. + SrsErrorPithyPrint* nack_epp; public: SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio); virtual ~SrsRtcSendTrack(); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d46ed99db..27e6d2e4f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -121,6 +121,9 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port) : send_min_interval = 0; tcp_nodelay = false; info = new SrsClientInfo(); + + publish_1stpkt_timeout = 0; + publish_normal_timeout = 0; _srs_config->subscribe(this); } diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 330968c2f..a3c66728d 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -193,6 +193,11 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o) skt = new SrsStSocket(); rtsp = new SrsRtspStack(skt); trd = new SrsSTCoroutine("rtsp", this); + + audio_id = 0; + video_id = 0; + audio_sample_rate = 0; + audio_channel = 0; req = NULL; sdk = NULL; @@ -223,6 +228,9 @@ SrsRtspConn::~SrsRtspConn() srs_freep(vjitter); srs_freep(ajitter); + + srs_freep(avc); + srs_freep(aac); srs_freep(acodec); srs_freep(acache); } diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 3fa14461e..80819c69b 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -380,7 +380,7 @@ SrsSignalManager::SrsSignalManager(SrsServer* s) server = s; sig_pipe[0] = sig_pipe[1] = -1; - trd = new SrsSTCoroutine("signal", this); + trd = new SrsSTCoroutine("signal", this, _srs_context->get_id()); signal_read_stfd = NULL; } @@ -539,7 +539,7 @@ srs_error_t SrsInotifyWorker::start() } if (((err = srs_fd_closeexec(fd))) != srs_success) { - return srs_error_new(ERROR_INOTIFY_OPENFD, "closeexec fd=%d", fd); + return srs_error_wrap(err, "closeexec fd=%d", fd); } // /* the following are legal, implemented events that user-space can watch for */ @@ -877,7 +877,7 @@ srs_error_t SrsServer::acquire_pid_file() // write the pid string pid = srs_int2str(getpid()); if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) { - return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%d to file=%s", pid.c_str(), pid_file.c_str()); + return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str()); } // auto close when fork child process. diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index a6c72f722..f3857b178 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -940,6 +940,9 @@ SrsNetworkRtmpServer::SrsNetworkRtmpServer() nb_conn_sys = nb_conn_srs = 0; nb_conn_sys_et = nb_conn_sys_tw = 0; nb_conn_sys_udp = 0; + rkbps = skbps = 0; + rkbps_30s = skbps_30s = 0; + rkbps_5m = skbps_5m = 0; } static SrsNetworkRtmpServer _srs_network_rtmp_server; @@ -1298,24 +1301,25 @@ string srs_string_dumps_hex(const char* str, int length, int limit) string srs_string_dumps_hex(const char* str, int length, int limit, char seperator, int line_limit, char newline) { - const int LIMIT = 1024*16; + // 1 byte trailing '\0'. + const int LIMIT = 1024*16 + 1; static char buf[LIMIT]; int len = 0; - for (int i = 0; i < length && i < limit && i < LIMIT; ++i) { + for (int i = 0; i < length && i < limit && len < LIMIT; ++i) { int nb = snprintf(buf + len, LIMIT - len, "%02x", (uint8_t)str[i]); - if (nb < 0 || nb > LIMIT - len) { + if (nb < 0 || nb >= LIMIT - len) { break; } len += nb; // Only append seperator and newline when not last byte. - if (i < length - 1 && i < limit - 1 && i < LIMIT - 1) { - if (seperator && len < LIMIT) { + if (i < length - 1 && i < limit - 1 && len < LIMIT) { + if (seperator) { buf[len++] = seperator; } - if (newline && line_limit && i > 0 && ((i + 1) % line_limit) == 0 && len < LIMIT) { + if (newline && line_limit && i > 0 && ((i + 1) % line_limit) == 0) { buf[len++] = newline; } } @@ -1325,6 +1329,17 @@ string srs_string_dumps_hex(const char* str, int length, int limit, char seperat if (len <= 0) { return ""; } + + // If overflow, cut the trailing newline. + if (newline && len >= LIMIT - 2 && buf[len - 1] == newline) { + len--; + } + + // If overflow, cut the trailing seperator. + if (seperator && len >= LIMIT - 3 && buf[len - 1] == seperator) { + len--; + } + return string(buf, len); } diff --git a/trunk/src/core/srs_core.cpp b/trunk/src/core/srs_core.cpp index 539f4e267..29b882ebe 100644 --- a/trunk/src/core/srs_core.cpp +++ b/trunk/src/core/srs_core.cpp @@ -27,11 +27,6 @@ _SrsContextId::_SrsContextId() { } -_SrsContextId::_SrsContextId(std::string v) -{ - v_ = v; -} - _SrsContextId::_SrsContextId(const _SrsContextId& cp) { v_ = cp.v_; @@ -62,3 +57,9 @@ int _SrsContextId::compare(const _SrsContextId& to) const return v_.compare(to.v_); } +_SrsContextId& _SrsContextId::set_value(const std::string& v) +{ + v_ = v; + return *this; +} + diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 9fa326aff..e3dd74ce9 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -124,7 +124,6 @@ private: std::string v_; public: _SrsContextId(); - _SrsContextId(std::string v); _SrsContextId(const _SrsContextId& cp); _SrsContextId& operator=(const _SrsContextId& cp); virtual ~_SrsContextId(); @@ -136,6 +135,8 @@ public: // <0 Either the value of the first character that does not match is lower in the compared string, or all compared characters match but the compared string is shorter. // >0 Either the value of the first character that does not match is greater in the compared string, or all compared characters match but the compared string is longer. int compare(const _SrsContextId& to) const; + // Set the value of context id. + _SrsContextId& set_value(const std::string& v); }; typedef _SrsContextId SrsContextId; #else diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 71356be0b..bdc884573 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 38 +#define SRS_VERSION4_REVISION 39 #endif diff --git a/trunk/src/kernel/srs_kernel_aac.cpp b/trunk/src/kernel/srs_kernel_aac.cpp index 95fa34b7b..5a4d42499 100644 --- a/trunk/src/kernel/srs_kernel_aac.cpp +++ b/trunk/src/kernel/srs_kernel_aac.cpp @@ -46,6 +46,8 @@ SrsAacTransmuxer::SrsAacTransmuxer() writer = NULL; got_sequence_header = false; aac_object = SrsAacObjectTypeReserved; + aac_sample_rate = 0; + aac_channels = 0; } SrsAacTransmuxer::~SrsAacTransmuxer() diff --git a/trunk/src/kernel/srs_kernel_mp4.cpp b/trunk/src/kernel/srs_kernel_mp4.cpp index 5f76caf37..e754a0c83 100644 --- a/trunk/src/kernel/srs_kernel_mp4.cpp +++ b/trunk/src/kernel/srs_kernel_mp4.cpp @@ -3912,7 +3912,7 @@ srs_error_t SrsMp4DecodingTime2SampleBox::on_sample(uint32_t sample_index, SrsMp index++; if (index >= entries.size()) { - return srs_error_new(ERROR_MP4_ILLEGAL_TIMESTAMP, "illegal ts, stts overflow, count=%d", entries.size()); + return srs_error_new(ERROR_MP4_ILLEGAL_TIMESTAMP, "illegal ts, stts overflow, count=%zd", entries.size()); } count += entries[index].sample_count; @@ -4038,7 +4038,7 @@ srs_error_t SrsMp4CompositionTime2SampleBox::on_sample(uint32_t sample_index, Sr index++; if (index >= entries.size()) { - return srs_error_new(ERROR_MP4_ILLEGAL_TIMESTAMP, "illegal ts, ctts overflow, count=%d", entries.size()); + return srs_error_new(ERROR_MP4_ILLEGAL_TIMESTAMP, "illegal ts, ctts overflow, count=%d", (int)entries.size()); } count += entries[index].sample_count; @@ -4581,6 +4581,8 @@ SrsMp4SegmentIndexBox::SrsMp4SegmentIndexBox() { type = SrsMp4BoxTypeSIDX; version = 0; + flags = reference_id = timescale = 0; + earliest_presentation_time = first_offset = 0; } SrsMp4SegmentIndexBox::~SrsMp4SegmentIndexBox() @@ -5215,7 +5217,7 @@ srs_error_t SrsMp4BoxReader::read(SrsSimpleStream* stream, SrsMp4Box** ppbox) while (stream->length() < (int)required) { ssize_t nread; if ((err = rsio->read(buf, SRS_MP4_BUF_SIZE, &nread)) != srs_success) { - return srs_error_wrap(err, "load failed, nread=%d, required=%d", nread, required); + return srs_error_wrap(err, "load failed, nread=%d, required=%d", (int)nread, (int)required); } srs_assert(nread > 0); @@ -6280,6 +6282,7 @@ SrsMp4M2tsSegmentEncoder::SrsMp4M2tsSegmentEncoder() decode_basetime = 0; styp_bytes = 0; mdat_bytes = 0; + track_id = 0; } SrsMp4M2tsSegmentEncoder::~SrsMp4M2tsSegmentEncoder() diff --git a/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp index 785d21eef..feebb14fe 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp @@ -31,6 +31,7 @@ using namespace std; SrsRtcpCommon::SrsRtcpCommon(): ssrc_(0), data_(NULL), nb_data_(0) { + payload_len_ = 0; } SrsRtcpCommon::~SrsRtcpCommon() @@ -165,7 +166,7 @@ uint8_t SrsRtcpApp::get_subtype() const string SrsRtcpApp::get_name() const { - return string((char*)name_); + return string((char*)name_, strnlen((char*)name_, 4)); } srs_error_t SrsRtcpApp::get_payload(uint8_t*& payload, int& len) @@ -190,7 +191,7 @@ srs_error_t SrsRtcpApp::set_subtype(uint8_t type) srs_error_t SrsRtcpApp::set_name(std::string name) { if(name.length() > 4) { - return srs_error_new(ERROR_RTC_RTCP, "invalid name length %d", name.length()); + return srs_error_new(ERROR_RTC_RTCP, "invalid name length %zu", name.length()); } memset(name_, 0, sizeof(name_)); @@ -298,6 +299,13 @@ SrsRtcpSR::SrsRtcpSR() header_.rc = 0; header_.version = kRtcpVersion; header_.length = 6; + + ssrc_ = 0; + ntp_ = 0; + rtp_ts_ = 0; + send_rtp_packets_ = 0; + send_rtp_bytes_ = 0; + send_rtp_bytes_ = 0; } SrsRtcpSR::~SrsRtcpSR() @@ -706,6 +714,11 @@ SrsRtcpTWCC::SrsRtcpTWCC(uint32_t sender_ssrc) : pkt_len(0) header_.rc = 15; header_.version = kRtcpVersion; ssrc_ = sender_ssrc; + media_ssrc_ = 0; + base_sn_ = 0; + packet_count_ = 0; + reference_time_ = 0; + fb_pkt_count_ = 0; } SrsRtcpTWCC::~SrsRtcpTWCC() @@ -1032,7 +1045,7 @@ srs_error_t SrsRtcpTWCC::process_pkt_chunk(SrsRtcpTWCC::SrsRtcpTWCCChunk& chunk, return err; } if ((err = encode_chunk(chunk)) != srs_success) { - return srs_error_new(ERROR_RTC_RTCP, "encode chunk, delta_size %u", delta_size); + return srs_error_wrap(err, "encode chunk, delta_size %u", delta_size); } add_to_chunk(chunk, delta_size); return err; @@ -1118,7 +1131,7 @@ srs_error_t SrsRtcpTWCC::do_encode(SrsBuffer *buffer) // FIXME 24-bit base receive delta not supported int recv_delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2; if ((err = process_pkt_chunk(chunk, recv_delta_size)) != srs_success) { - return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta", recv_delta_size); + return srs_error_wrap(err, "delta_size %d, failed to append_recv_delta", recv_delta_size); } pkt_deltas_.push_back(delta); @@ -1178,6 +1191,7 @@ SrsRtcpNack::SrsRtcpNack(uint32_t sender_ssrc) header_.rc = 1; header_.version = kRtcpVersion; ssrc_ = sender_ssrc; + media_ssrc_ = 0; } SrsRtcpNack::~SrsRtcpNack() diff --git a/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp index c59759dd2..588acb89a 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp @@ -66,6 +66,14 @@ struct SrsRtcpHeader uint16_t type:8; uint16_t length:16; + + SrsRtcpHeader() { + rc = 0; + padding = 0; + version = 0; + type = 0; + length = 0; + } }; class SrsRtcpCommon: public ISrsCodec @@ -134,6 +142,16 @@ struct SrsRtcpRB uint32_t jitter; uint32_t lsr; uint32_t dlsr; + + SrsRtcpRB() { + ssrc = 0; + fraction_lost = 0; + lost_packets = 0; + highest_sn = 0; + jitter = 0; + lsr = 0; + dlsr = 0; + } }; class SrsRtcpSR : public SrsRtcpCommon diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index 87f4c86da..481d24bfe 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -63,7 +63,7 @@ SrsRtpExtensionTypes::~SrsRtpExtensionTypes() bool SrsRtpExtensionTypes::register_by_uri(int id, std::string uri) { - for (int i = 0; i < (int)sizeof(kExtensions); ++i) { + for (int i = 0; i < (int)(sizeof(kExtensions)/sizeof(kExtensions[0])); ++i) { if (kExtensions[i].uri == uri) { return register_id(id, kExtensions[i].type, kExtensions[i].uri); } @@ -247,7 +247,7 @@ srs_error_t SrsRtpExtensions::decode_0xbede(SrsBuffer* buf) SrsRtpExtensionType xtype = types_.get_type(id); if (xtype == kRtpExtensionTransportSequenceNumber) { - if (srs_success != (err = twcc_.decode(buf))) { + if ((err = twcc_.decode(buf)) != srs_success) { return srs_error_wrap(err, "decode twcc extension"); } has_ext_ = true; @@ -287,7 +287,7 @@ srs_error_t SrsRtpExtensions::encode(SrsBuffer* buf) // Write extensions. if (twcc_.has_twcc_ext()) { - if (srs_success != (err = twcc_.encode(buf))) { + if ((err = twcc_.encode(buf)) != srs_success) { return srs_error_wrap(err, "encode twcc extension"); } } diff --git a/trunk/src/kernel/srs_kernel_ts.cpp b/trunk/src/kernel/srs_kernel_ts.cpp index 27d40315c..d0e0d44a0 100644 --- a/trunk/src/kernel/srs_kernel_ts.cpp +++ b/trunk/src/kernel/srs_kernel_ts.cpp @@ -1275,6 +1275,41 @@ SrsTsPayloadPES::SrsTsPayloadPES(SrsTsPacket* p) : SrsTsPayload(p) nb_paddings = 0; const2bits = 0x02; const1_value0 = 0x07; + + packet_start_code_prefix = 0; + stream_id = 0; + PES_packet_length = 0; + PES_scrambling_control = 0; + PES_priority = 0; + data_alignment_indicator = 0; + copyright = 0; + original_or_copy = 0; + PTS_DTS_flags = 0; + ESCR_flag = 0; + ES_rate_flag = 0; + DSM_trick_mode_flag = 0; + additional_copy_info_flag = 0; + PES_CRC_flag = 0; + PES_extension_flag = 0; + PES_header_data_length = 0; + pts = dts = 0; + ESCR_base = 0; + ESCR_extension = 0; + ES_rate = 0; + trick_mode_control = 0; + trick_mode_value = 0; + additional_copy_info = 0; + previous_PES_packet_CRC = 0; + PES_private_data_flag = 0; + pack_header_field_flag = 0; + program_packet_sequence_counter_flag = 0; + P_STD_buffer_flag = 0; + PES_extension_flag_2 = 0; + program_packet_sequence_counter = 0; + MPEG1_MPEG2_identifier = 0; + original_stuff_length = 0; + P_STD_buffer_scale = 0; + P_STD_buffer_size = 0; } SrsTsPayloadPES::~SrsTsPayloadPES() @@ -1977,6 +2012,9 @@ SrsTsPayloadPSI::SrsTsPayloadPSI(SrsTsPacket* p) : SrsTsPayload(p) const0_value = 0; const1_value = 3; CRC_32 = 0; + section_length = 0; + section_syntax_indicator = 0; + table_id = SrsTsPsiIdPas; } SrsTsPayloadPSI::~SrsTsPayloadPSI() @@ -2188,7 +2226,12 @@ srs_error_t SrsTsPayloadPATProgram::encode(SrsBuffer* stream) SrsTsPayloadPAT::SrsTsPayloadPAT(SrsTsPacket* p) : SrsTsPayloadPSI(p) { + transport_stream_id = 0; const3_value = 3; + version_number = 0; + current_next_indicator = 0; + section_number = 0; + last_section_number = 0; } SrsTsPayloadPAT::~SrsTsPayloadPAT() @@ -2387,6 +2430,12 @@ SrsTsPayloadPMT::SrsTsPayloadPMT(SrsTsPacket* p) : SrsTsPayloadPSI(p) const1_value0 = 3; const1_value1 = 7; const1_value2 = 0x0f; + PCR_PID = 0; + last_section_number = 0; + program_number = 0; + version_number = 0; + current_next_indicator = 0; + section_number = 0; } SrsTsPayloadPMT::~SrsTsPayloadPMT() diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index 29b928a61..91895f422 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -147,6 +147,7 @@ private: skip = false; sent = false; dirty = false; + duration = 0.0; } int fetch(std::string m3u8); diff --git a/trunk/src/protocol/srs_http_stack.cpp b/trunk/src/protocol/srs_http_stack.cpp index 93cda0fca..904ad419a 100644 --- a/trunk/src/protocol/srs_http_stack.cpp +++ b/trunk/src/protocol/srs_http_stack.cpp @@ -457,7 +457,7 @@ srs_error_t SrsHttpFileServer::serve_file(ISrsHttpResponseWriter* w, ISrsHttpMes // write body. int64_t left = length; if ((err = copy(w, fs, r, (int)left)) != srs_success) { - return srs_error_wrap(err, "copy file=%s size=%d", fullpath.c_str(), left); + return srs_error_wrap(err, "copy file=%s size=%d", fullpath.c_str(), (int)left); } if ((err = w->final_request()) != srs_success) { @@ -557,7 +557,7 @@ srs_error_t SrsHttpFileServer::copy(ISrsHttpResponseWriter* w, SrsFileReader* fs left -= nread; if ((err = w->write(buf, (int)nread)) != srs_success) { - return srs_error_wrap(err, "write limit=%d, bytes=%d, left=%d", max_read, nread, left); + return srs_error_wrap(err, "write limit=%d, bytes=%d, left=%d", max_read, (int)nread, left); } } diff --git a/trunk/src/protocol/srs_protocol_amf0.cpp b/trunk/src/protocol/srs_protocol_amf0.cpp index 3af4d2d7e..c0a478c1f 100644 --- a/trunk/src/protocol/srs_protocol_amf0.cpp +++ b/trunk/src/protocol/srs_protocol_amf0.cpp @@ -1761,7 +1761,7 @@ namespace _srs_internal // data if (!stream->require((int)value.length())) { - return srs_error_new(ERROR_RTMP_AMF0_ENCODE, "requires %d only %d bytes", value.length(), stream->left()); + return srs_error_new(ERROR_RTMP_AMF0_ENCODE, "requires %" PRIu64 " only %d bytes", value.length(), stream->left()); } stream->write_string(value); diff --git a/trunk/src/protocol/srs_rtc_stun_stack.cpp b/trunk/src/protocol/srs_rtc_stun_stack.cpp index f35b4f942..fe7f2385f 100644 --- a/trunk/src/protocol/srs_rtc_stun_stack.cpp +++ b/trunk/src/protocol/srs_rtc_stun_stack.cpp @@ -92,6 +92,8 @@ SrsStunPacket::SrsStunPacket() use_candidate = false; ice_controlled = false; ice_controlling = false; + mapped_port = 0; + mapped_address = 0; } SrsStunPacket::~SrsStunPacket() diff --git a/trunk/src/protocol/srs_rtmp_handshake.cpp b/trunk/src/protocol/srs_rtmp_handshake.cpp index b7f92ae59..9a419a6ce 100644 --- a/trunk/src/protocol/srs_rtmp_handshake.cpp +++ b/trunk/src/protocol/srs_rtmp_handshake.cpp @@ -865,6 +865,8 @@ namespace _srs_internal c1s1::c1s1() { payload = NULL; + version = 0; + time = 0; } c1s1::~c1s1() { @@ -978,7 +980,7 @@ namespace _srs_internal srs_random_generate(random, 1504); int size = snprintf(random, 1504, "%s", RTMP_SIG_SRS_HANDSHAKE); - srs_assert(++size < 1504); + srs_assert(size < 1504); snprintf(random + 1504 - size, size, "%s", RTMP_SIG_SRS_HANDSHAKE); srs_random_generate(digest, 32); @@ -1225,6 +1227,7 @@ srs_error_t SrsComplexHandshake::handshake_with_client(SrsHandshakeBytes* hs_byt } // verify s2 if ((err = s2.s2_validate(&c1, is_valid)) != srs_success || !is_valid) { + srs_freep(err); return srs_error_new(ERROR_RTMP_TRY_SIMPLE_HS, "verify s2 failed, try simple handshake"); } diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index f56fcdcb4..f80cf77cd 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -3977,7 +3977,7 @@ srs_error_t SrsPlayPacket::decode(SrsBuffer* stream) SrsAmf0Any* reset_value = NULL; if ((err = srs_amf0_read_any(stream, &reset_value)) != srs_success) { - return srs_error_new(ERROR_RTMP_AMF0_DECODE, "reset"); + return srs_error_wrap(err, "reset"); } SrsAutoFree(SrsAmf0Any, reset_value); diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp index c1e828e44..1237f6454 100644 --- a/trunk/src/protocol/srs_rtsp_stack.cpp +++ b/trunk/src/protocol/srs_rtsp_stack.cpp @@ -792,6 +792,9 @@ SrsRtspSetupResponse::SrsRtspSetupResponse(int seq) : SrsRtspResponse(seq) { local_port_min = 0; local_port_max = 0; + + client_port_min = 0; + client_port_max = 0; } SrsRtspSetupResponse::~SrsRtspSetupResponse() diff --git a/trunk/src/protocol/srs_service_http_conn.cpp b/trunk/src/protocol/srs_service_http_conn.cpp index 3f642e2f8..185634046 100644 --- a/trunk/src/protocol/srs_service_http_conn.cpp +++ b/trunk/src/protocol/srs_service_http_conn.cpp @@ -139,7 +139,7 @@ srs_error_t SrsHttpParser::parse_message_imp(ISrsReader* reader) enum http_errno code; if ((code = HTTP_PARSER_ERRNO(&parser)) != HPE_OK) { return srs_error_new(ERROR_HTTP_PARSE_HEADER, "parse %dB, nparsed=%d, err=%d/%s %s", - buffer->size(), consumed, code, http_errno_name(code), http_errno_description(code)); + buffer->size(), (int)consumed, code, http_errno_name(code), http_errno_description(code)); } // When buffer consumed these bytes, it's dropped so the new ptr is actually the HTTP body. But http-parser @@ -319,6 +319,7 @@ SrsHttpMessage::SrsHttpMessage(ISrsReader* reader, SrsFastStream* buffer) : ISrs _content_length = -1; // From HTTP/1.1, default to keep alive. _keep_alive = true; + type_ = 0; } SrsHttpMessage::~SrsHttpMessage() @@ -778,7 +779,7 @@ srs_error_t SrsHttpResponseWriter::write(char* data, int size) iovs[3].iov_base = (char*)SRS_HTTP_CRLF; iovs[3].iov_len = 2; - ssize_t nwrite; + ssize_t nwrite = 0; if ((err = skt->writev(iovs, 4, &nwrite)) != srs_success) { return srs_error_wrap(err, "write chunk"); } @@ -854,7 +855,7 @@ srs_error_t SrsHttpResponseWriter::writev(const iovec* iov, int iovcnt, ssize_t* iovss[2+iovcnt].iov_len = 2; // sendout all ioves. - ssize_t nwrite; + ssize_t nwrite = 0; if ((err = srs_write_large_iovs(skt, iovss, nb_iovss, &nwrite)) != srs_success) { return srs_error_wrap(err, "writev large iovs"); } @@ -940,6 +941,7 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, ISrsReader* re nb_total_read = 0; nb_left_chunk = 0; buffer = body; + nb_chunk = 0; } SrsHttpResponseReader::~SrsHttpResponseReader() diff --git a/trunk/src/protocol/srs_service_log.cpp b/trunk/src/protocol/srs_service_log.cpp index b251ff310..baf9f5f4f 100644 --- a/trunk/src/protocol/srs_service_log.cpp +++ b/trunk/src/protocol/srs_service_log.cpp @@ -45,8 +45,8 @@ SrsThreadContext::~SrsThreadContext() SrsContextId SrsThreadContext::generate_id() { - SrsContextId cid = SrsContextId(srs_random_str(8)); - return cid; + SrsContextId cid = SrsContextId(); + return cid.set_value(srs_random_str(8)); } const SrsContextId& SrsThreadContext::get_id() diff --git a/trunk/src/utest/srs_utest_app.cpp b/trunk/src/utest/srs_utest_app.cpp index e04a859de..010ed4272 100644 --- a/trunk/src/utest/srs_utest_app.cpp +++ b/trunk/src/utest/srs_utest_app.cpp @@ -92,7 +92,8 @@ public: // Quit without error. bool quit; public: - MockCoroutineHandler() : trd(NULL), err(srs_success), cid("0"), quit(false) { + MockCoroutineHandler() : trd(NULL), err(srs_success), quit(false) { + cid.set_value("0"); running = srs_cond_new(); exited = srs_cond_new(); } @@ -220,16 +221,17 @@ VOID TEST(AppCoroutineTest, Cycle) if (true) { MockCoroutineHandler ch; - SrsSTCoroutine sc("test", &ch, SrsContextId("250")); + SrsContextId cid; + SrsSTCoroutine sc("test", &ch, cid.set_value("250")); ch.trd = ≻ - EXPECT_TRUE(!sc.cid().compare(SrsContextId("250"))); + EXPECT_TRUE(!sc.cid().compare(cid)); EXPECT_TRUE(srs_success == sc.start()); EXPECT_TRUE(srs_success == sc.pull()); // After running, the cid in cycle should equal to the thread. srs_cond_timedwait(ch.running, 100 * SRS_UTIME_MILLISECONDS); - EXPECT_TRUE(!ch.cid.compare(SrsContextId("250"))); + EXPECT_TRUE(!ch.cid.compare(cid)); } if (true) { diff --git a/trunk/src/utest/srs_utest_rtc.cpp b/trunk/src/utest/srs_utest_rtc.cpp index a8182c441..2a00be08b 100644 --- a/trunk/src/utest/srs_utest_rtc.cpp +++ b/trunk/src/utest/srs_utest_rtc.cpp @@ -34,6 +34,72 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include using namespace std; +VOID TEST(KernelRTCTest, StringDumpHexTest) +{ + // Typical normal case. + if (false) { + char data[8]; + data[0] = (char)0x3c; data[sizeof(data) - 2] = (char)0x67; data[sizeof(data) - 1] = (char)0xc3; + string r = srs_string_dumps_hex(data, sizeof(data), INT_MAX, 0, 0, 0); + EXPECT_EQ(16, (int)r.length()); + EXPECT_EQ('3', r.at(0)); EXPECT_EQ('c', r.at(1)); + EXPECT_EQ('c', r.at(r.length() - 2)); EXPECT_EQ('3', r.at(r.length() - 1)); + EXPECT_EQ('6', r.at(r.length() - 4)); EXPECT_EQ('7', r.at(r.length() - 3)); + } + + // Fill all buffer. + if (false) { + char data[8 * 1024]; + data[0] = (char)0x3c; data[sizeof(data) - 2] = (char)0x67; data[sizeof(data) - 1] = (char)0xc3; + string r = srs_string_dumps_hex(data, sizeof(data), INT_MAX, 0, 0, 0); + EXPECT_EQ(16 * 1024, (int)r.length()); + EXPECT_EQ('3', r.at(0)); EXPECT_EQ('c', r.at(1)); + EXPECT_EQ('c', r.at(r.length() - 2)); EXPECT_EQ('3', r.at(r.length() - 1)); + EXPECT_EQ('6', r.at(r.length() - 4)); EXPECT_EQ('7', r.at(r.length() - 3)); + } + + // Overflow 1 byte. + if (true) { + char data[8 * 1024 + 1]; + data[0] = (char)0x3c; data[sizeof(data) - 2] = (char)0x67; data[sizeof(data) - 1] = (char)0xc3; + string r = srs_string_dumps_hex(data, sizeof(data), INT_MAX, 0, 0, 0); + EXPECT_EQ(16 * 1024, (int)r.length()); + EXPECT_EQ('3', r.at(0)); EXPECT_EQ('c', r.at(1)); + EXPECT_EQ('6', r.at(r.length() - 2)); EXPECT_EQ('7', r.at(r.length() - 1)); + } + + // Fill all buffer, with seperator. + if (true) { + char data[5461]; + data[0] = (char)0x3c; data[sizeof(data) - 2] = (char)0x67; data[sizeof(data) - 1] = (char)0xc3; + string r = srs_string_dumps_hex(data, sizeof(data), INT_MAX, ',', 0, 0); + EXPECT_EQ(16383 - 1, (int)r.length()); + EXPECT_EQ('3', r.at(0)); EXPECT_EQ('c', r.at(1)); + EXPECT_EQ('c', r.at(r.length() - 2)); EXPECT_EQ('3', r.at(r.length() - 1)); + EXPECT_EQ('6', r.at(r.length() - 5)); EXPECT_EQ('7', r.at(r.length() - 4)); + } + + // Overflow 1 byte, with seperator. + if (true) { + char data[5461 + 1]; + data[0] = (char)0x3c; data[sizeof(data) - 2] = (char)0x67; data[sizeof(data) - 1] = (char)0xc3; + string r = srs_string_dumps_hex(data, sizeof(data), INT_MAX, ',', 0, 0); + EXPECT_EQ(16383 - 1, (int)r.length()); + EXPECT_EQ('3', r.at(0)); EXPECT_EQ('c', r.at(1)); + EXPECT_EQ('6', r.at(r.length() - 2)); EXPECT_EQ('7', r.at(r.length() - 1)); + } + + // Overflow 1 byte, with seperator and newline. + if (true) { + char data[5461 + 1]; + data[0] = (char)0x3c; data[sizeof(data) - 2] = (char)0x67; data[sizeof(data) - 1] = (char)0xc3; + string r = srs_string_dumps_hex(data, sizeof(data), INT_MAX, ',', 5461, '\n'); + EXPECT_EQ(16383 - 1, (int)r.length()); + EXPECT_EQ('3', r.at(0)); EXPECT_EQ('c', r.at(1)); + EXPECT_EQ('6', r.at(r.length() - 2)); EXPECT_EQ('7', r.at(r.length() - 1)); + } +} + extern SSL_CTX* srs_build_dtls_ctx(SrsDtlsVersion version); class MockDtls @@ -188,6 +254,23 @@ public: srs_error_t r0; bool done; std::vector samples; +public: + int nn_client_hello_lost; + int nn_server_hello_lost; + int nn_certificate_lost; + int nn_new_session_lost; + int nn_change_cipher_lost; +public: + // client -> server + int nn_client_hello; + // server -> client + int nn_server_hello; + // client -> server + int nn_certificate; + // server -> client + int nn_new_session; + int nn_change_cipher; +public: MockDtlsCallback(); virtual ~MockDtlsCallback(); virtual srs_error_t on_dtls_handshake_done(); @@ -204,6 +287,18 @@ MockDtlsCallback::MockDtlsCallback() done = false; trd = new SrsSTCoroutine("mock", this); srs_assert(trd->start() == srs_success); + + nn_client_hello_lost = 0; + nn_server_hello_lost = 0; + nn_certificate_lost = 0; + nn_new_session_lost = 0; + nn_change_cipher_lost = 0; + + nn_client_hello = 0; + nn_server_hello = 0; + nn_certificate = 0; + nn_new_session = 0; + nn_change_cipher = 0; } MockDtlsCallback::~MockDtlsCallback() @@ -228,10 +323,49 @@ srs_error_t MockDtlsCallback::on_dtls_application_data(const char* data, const i srs_error_t MockDtlsCallback::write_dtls_data(void* data, int size) { + int nn_lost = 0; + if (true) { + uint8_t content_type = 0; + if (size >= 1) { + content_type = (uint8_t)((uint8_t*)data)[0]; + } + + uint8_t handshake_type = 0; + if (size >= 14) { + handshake_type = (uint8_t)((uint8_t*)data)[13]; + } + + if (content_type == 22) { + if (handshake_type == 1) { + nn_lost = nn_client_hello_lost--; + nn_client_hello++; + } else if (handshake_type == 2) { + nn_lost = nn_server_hello_lost--; + nn_server_hello++; + } else if (handshake_type == 11) { + nn_lost = nn_certificate_lost--; + nn_certificate++; + } else if (handshake_type == 4) { + nn_lost = nn_new_session_lost--; + nn_new_session++; + } + } else if (content_type == 20) { + nn_lost = nn_change_cipher_lost--; + nn_change_cipher++; + } + } + + // Simulate to drop packet. + if (nn_lost > 0) { + return srs_success; + } + + // Send out it. char* cp = new char[size]; memcpy(cp, data, size); samples.push_back(SrsSample((char*)cp, size)); + return srs_success; } @@ -249,10 +383,12 @@ srs_error_t MockDtlsCallback::cycle() continue; } - SrsSample p = *samples.erase(samples.begin()); + SrsSample p = samples.at(0); + samples.erase(samples.begin()); + if (peer) { err = peer->on_dtls((char*)p.bytes, p.size); - } else { + } else if (peer2) { err = peer2->on_dtls((char*)p.bytes, p.size); } @@ -266,10 +402,29 @@ srs_error_t MockDtlsCallback::cycle() } // Wait for mock io to done, try to switch to coroutine many times. -#define mock_wait_dtls_io_done(cio, sio) \ - for (int i = 0; i < 100 && (!cio.samples.empty() || !sio.samples.empty()); i++) { \ - srs_usleep(0 * SRS_UTIME_MILLISECONDS); \ +void mock_wait_dtls_io_done(int count = 100, int interval = 0) +{ + for (int i = 0; i < count; i++) { + srs_usleep(interval * SRS_UTIME_MILLISECONDS); } +} + +// To avoid the crash when peer or peer2 is freed before io. +class MockBridgeDtlsIO +{ +private: + MockDtlsCallback* io_; +public: + MockBridgeDtlsIO(MockDtlsCallback* io, SrsDtls* peer, MockDtls* peer2) { + io_ = io; + io->peer = peer; + io->peer2 = peer2; + } + virtual ~MockBridgeDtlsIO() { + io_->peer = NULL; + io_->peer2 = NULL; + } +}; struct DTLSServerFlowCase { @@ -293,6 +448,366 @@ std::ostream& operator<< (std::ostream& stream, const DTLSServerFlowCase& c) return stream; } +VOID TEST(KernelRTCTest, DTLSARQLimitTest) +{ + srs_error_t err = srs_success; + + // ClientHello lost, client retransmit the ClientHello. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 10 packets, total packets should be 8(max to 8). + // Note that only one server hello. + cio.nn_client_hello_lost = 10; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_FALSE(cio.done); + EXPECT_FALSE(sio.done); + + EXPECT_EQ(8, cio.nn_client_hello); + EXPECT_EQ(0, sio.nn_server_hello); + EXPECT_EQ(0, cio.nn_certificate); + EXPECT_EQ(0, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // Certificate lost, client retransmit the Certificate. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 10 packets, total packets should be 8(max to 8). + // Note that only one server NewSessionTicket. + cio.nn_certificate_lost = 10; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_FALSE(cio.done); + EXPECT_FALSE(sio.done); + + EXPECT_EQ(1, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(8, cio.nn_certificate); + EXPECT_EQ(0, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // ServerHello lost, client retransmit the ClientHello. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 10 packets, total packets should be 8(max to 8). + sio.nn_server_hello_lost = 10; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_FALSE(cio.done); + EXPECT_FALSE(sio.done); + + EXPECT_EQ(8, cio.nn_client_hello); + EXPECT_EQ(8, sio.nn_server_hello); + EXPECT_EQ(0, cio.nn_certificate); + EXPECT_EQ(0, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // NewSessionTicket lost, client retransmit the Certificate. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 10 packets, total packets should be 8(max to 8). + sio.nn_new_session_lost = 10; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + // Although the packet is lost, but it's done for server, and not done for client. + EXPECT_FALSE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(1, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(8, cio.nn_certificate); + EXPECT_EQ(8, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } +} + +VOID TEST(KernelRTCTest, DTLSClientARQTest) +{ + srs_error_t err = srs_success; + + // No ARQ, check the number of packets. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + cio.peer = &server; sio.peer = &client; + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(30, 1); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_TRUE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(1, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(1, cio.nn_certificate); + EXPECT_EQ(1, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // ClientHello lost, client retransmit the ClientHello. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 2 packets, total packets should be 3. + // Note that only one server hello. + cio.nn_client_hello_lost = 2; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_TRUE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(3, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(1, cio.nn_certificate); + EXPECT_EQ(1, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // Certificate lost, client retransmit the Certificate. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 2 packets, total packets should be 3. + // Note that only one server NewSessionTicket. + cio.nn_certificate_lost = 2; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_TRUE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(1, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(3, cio.nn_certificate); + EXPECT_EQ(1, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } +} + +VOID TEST(KernelRTCTest, DTLSServerARQTest) +{ + srs_error_t err = srs_success; + + // No ARQ, check the number of packets. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + cio.peer = &server; sio.peer = &client; + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(30, 1); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_TRUE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(1, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(1, cio.nn_certificate); + EXPECT_EQ(1, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // ServerHello lost, client retransmit the ClientHello. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 2 packets, total packets should be 3. + sio.nn_server_hello_lost = 2; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_TRUE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(3, cio.nn_client_hello); + EXPECT_EQ(3, sio.nn_server_hello); + EXPECT_EQ(1, cio.nn_certificate); + EXPECT_EQ(1, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } + + // NewSessionTicket lost, client retransmit the Certificate. + if (true) { + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; SrsDtls server(&sio); + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", "dtls1.0")); + HELPER_EXPECT_SUCCESS(server.initialize("passive", "dtls1.0")); + + // Use very short interval for utest. + dynamic_cast(client.impl)->arq_first = 1 * SRS_UTIME_MILLISECONDS; + dynamic_cast(client.impl)->arq_interval = 1 * SRS_UTIME_MILLISECONDS; + + // Lost 2 packets, total packets should be 3. + sio.nn_new_session_lost = 2; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()); + mock_wait_dtls_io_done(10, 3); + + EXPECT_TRUE(sio.r0 == srs_success); + EXPECT_TRUE(cio.r0 == srs_success); + + EXPECT_TRUE(cio.done); + EXPECT_TRUE(sio.done); + + EXPECT_EQ(1, cio.nn_client_hello); + EXPECT_EQ(1, sio.nn_server_hello); + EXPECT_EQ(3, cio.nn_certificate); + EXPECT_EQ(3, sio.nn_new_session); + EXPECT_EQ(0, sio.nn_change_cipher); + } +} + +VOID TEST(KernelRTCTest, DTLSClientFlowTest) +{ + srs_error_t err = srs_success; + + DTLSServerFlowCase cases[] = { + // OK, Client, Server: DTLS v1.0 + {0, "dtls1.0", "dtls1.0", true, true, false, false}, + // OK, Client, Server: DTLS v1.2 + {1, "dtls1.2", "dtls1.2", true, true, false, false}, + // OK, Client: DTLS v1.0, Server: DTLS auto(v1.0 or v1.2). + {2, "dtls1.0", "auto", true, true, false, false}, + // OK, Client: DTLS v1.2, Server: DTLS auto(v1.0 or v1.2). + {3, "dtls1.2", "auto", true, true, false, false}, + // OK, Client: DTLS auto(v1.0 or v1.2), Server: DTLS v1.0 + {4, "auto", "dtls1.0", true, true, false, false}, + // OK, Client: DTLS auto(v1.0 or v1.2), Server: DTLS v1.0 + {5, "auto", "dtls1.2", true, true, false, false}, + // Fail, Client: DTLS v1.0, Server: DTLS v1.2 + {6, "dtls1.0", "dtls1.2", false, false, false, true}, + // Fail, Client: DTLS v1.2, Server: DTLS v1.0 + {7, "dtls1.2", "dtls1.0", false, false, true, false}, + }; + + for (int i = 0; i < (int)(sizeof(cases) / sizeof(DTLSServerFlowCase)); i++) { + DTLSServerFlowCase c = cases[i]; + + MockDtlsCallback cio; SrsDtls client(&cio); + MockDtlsCallback sio; MockDtls server(&sio); + MockBridgeDtlsIO b0(&cio, NULL, &server); MockBridgeDtlsIO b1(&sio, &client, NULL); + HELPER_EXPECT_SUCCESS(client.initialize("active", c.ClientVersion)) << c; + HELPER_EXPECT_SUCCESS(server.initialize("passive", c.ServerVersion)) << c; + + HELPER_EXPECT_SUCCESS(client.start_active_handshake()) << c; + mock_wait_dtls_io_done(); + + // Note that the cio error is generated from server, vice versa. + EXPECT_EQ(c.ClientError, sio.r0 != srs_success) << c; + EXPECT_EQ(c.ServerError, cio.r0 != srs_success) << c; + + EXPECT_EQ(c.ClientDone, cio.done) << c; + EXPECT_EQ(c.ServerDone, sio.done) << c; + } +} + VOID TEST(KernelRTCTest, DTLSServerFlowTest) { srs_error_t err = srs_success; @@ -321,12 +836,12 @@ VOID TEST(KernelRTCTest, DTLSServerFlowTest) MockDtlsCallback cio; MockDtls client(&cio); MockDtlsCallback sio; SrsDtls server(&sio); - cio.peer = &server; sio.peer2 = &client; + MockBridgeDtlsIO b0(&cio, &server, NULL); MockBridgeDtlsIO b1(&sio, NULL, &client); HELPER_EXPECT_SUCCESS(client.initialize("active", c.ClientVersion)) << c; HELPER_EXPECT_SUCCESS(server.initialize("passive", c.ServerVersion)) << c; HELPER_EXPECT_SUCCESS(client.start_active_handshake()) << c; - mock_wait_dtls_io_done(cio, sio); + mock_wait_dtls_io_done(); // Note that the cio error is generated from server, vice versa. EXPECT_EQ(c.ClientError, sio.r0 != srs_success) << c; @@ -668,7 +1183,7 @@ VOID TEST(KernelRTCTest, DefaultTrackStatus) // Enable it by publisher. if (true) { - SrsRtcConnection s(NULL, SrsContextId()); SrsRtcPublishStream publish(&s); + SrsRtcConnection s(NULL, SrsContextId()); SrsRtcPublishStream publish(&s, SrsContextId()); SrsRtcAudioRecvTrack* audio; SrsRtcVideoRecvTrack *video; if (true) { diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index 9f075e477..75e33b546 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -1382,19 +1382,34 @@ VOID TEST(TCPServerTest, ContextUtility) if (true) { SrsThreadContext ctx; - EXPECT_TRUE(!ctx.set_id(SrsContextId("100")).compare(SrsContextId("100"))); - EXPECT_TRUE(!ctx.set_id(SrsContextId("1000")).compare(SrsContextId("1000"))); - EXPECT_TRUE(!ctx.get_id().compare(SrsContextId("1000"))); + if (true) { + SrsContextId cid; + EXPECT_TRUE(!ctx.set_id(cid.set_value("100")).compare(cid)); + } + if (true) { + SrsContextId cid; + EXPECT_TRUE(!ctx.set_id(cid.set_value("1000")).compare(cid)); + } + if (true) { + SrsContextId cid; + EXPECT_TRUE(!ctx.get_id().compare(cid.set_value("1000"))); + } ctx.clear_cid(); - EXPECT_TRUE(!ctx.set_id(SrsContextId("100")).compare(SrsContextId("100"))); + if (true) { + SrsContextId cid; + EXPECT_TRUE(!ctx.set_id(cid.set_value("100")).compare(cid)); + } } + SrsContextId cid; + cid.set_value("100"); + int base_size = 0; if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", SrsContextId("100"), "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", cid, "Trace", &size)); base_size = size; EXPECT_TRUE(base_size > 0); } @@ -1402,21 +1417,21 @@ VOID TEST(TCPServerTest, ContextUtility) if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", SrsContextId("100"), "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", cid, "Trace", &size)); EXPECT_EQ(base_size, size); } if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, false, true, NULL, SrsContextId("100"), "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, false, true, NULL, cid, "Trace", &size)); EXPECT_EQ(base_size - 5, size); } if (true) { errno = 0; int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0); - ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, SrsContextId("100"), "Trace", &size)); + ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, cid, "Trace", &size)); EXPECT_EQ(base_size - 8, size); }