1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch 'feature/rtc' into develop

This commit is contained in:
winlin 2020-08-30 12:16:13 +08:00
commit 8722bd2e0c
52 changed files with 1265 additions and 331 deletions

View file

@ -151,7 +151,7 @@ srs_error_t SrsBandwidth::bandwidth_check(SrsRtmpServer* rtmp, ISrsProtocolStati
// reject the connection in the interval window.
if (last_check_time > 0 && time_now - last_check_time < interval) {
_rtmp->response_connect_reject(_req, "bandcheck rejected");
return srs_error_new(ERROR_SYSTEM_BANDWIDTH_DENIED, "reject, last_check=%" PRId64 ", now=%" PRId64 ", interval=%d", last_check_time, time_now, interval);
return srs_error_new(ERROR_SYSTEM_BANDWIDTH_DENIED, "reject, last_check=%" PRId64 ", now=%" PRId64 ", interval=%" PRId64 "", last_check_time, time_now, interval);
}
// accept and do bandwidth check.

View file

@ -130,7 +130,7 @@ namespace _srs_internal
// read total content from file.
ssize_t nread = 0;
if ((err = reader.read(start, filesize, &nread)) != srs_success) {
return srs_error_wrap(err, "read %d only %d bytes", filesize, nread);
return srs_error_wrap(err, "read %d only %d bytes", filesize, (int)nread);
}
return err;
@ -620,6 +620,7 @@ srs_error_t srs_config_dumps_engine(SrsConfDirective* dir, SrsJsonObject* engine
SrsConfDirective::SrsConfDirective()
{
conf_line = 0;
}
SrsConfDirective::~SrsConfDirective()
@ -1140,6 +1141,7 @@ SrsConfig::SrsConfig()
show_help = false;
show_version = false;
test_conf = false;
show_signature = false;
root = new SrsConfDirective();
root->conf_line = 0;
@ -3661,7 +3663,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; i < (int)listens.size(); i++) {
string port = listens[i];
if (port.empty() || ::atoi(port.c_str()) <= 0) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "listen.port=%d is invalid", port.c_str());
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "listen.port=%s is invalid", port.c_str());
}
}
}

View file

@ -307,6 +307,7 @@ SrsDashController::SrsDashController()
vfragments = new SrsFragmentWindow();
afragments = new SrsFragmentWindow();
audio_dts = video_dts = 0;
fragment = 0;
}
SrsDashController::~SrsDashController()

View file

@ -48,6 +48,7 @@ SrsDvrSegmenter::SrsDvrSegmenter()
req = NULL;
jitter = NULL;
plan = NULL;
wait_keyframe = true;
fragment = new SrsFragment();
fs = new SrsFileWriter();
@ -585,6 +586,7 @@ string SrsDvrAsyncCallOnDvr::to_string()
SrsDvrPlan::SrsDvrPlan()
{
req = NULL;
hub = NULL;
dvr_enabled = false;
segment = NULL;

View file

@ -65,6 +65,7 @@ SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r)
{
redirect = r;
sdk = NULL;
selected_port = 0;
}
SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream()
@ -440,6 +441,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
edge = NULL;
req = NULL;
send_error_code = ERROR_SUCCESS;
source = NULL;
sdk = NULL;
lb = new SrsLbRoundRobin();

View file

@ -200,6 +200,7 @@ SrsHlsMuxer::SrsHlsMuxer()
accept_floor_ts = 0;
hls_ts_floor = false;
max_td = 0;
writer = NULL;
_sequence_no = 0;
current = NULL;
hls_keys = false;
@ -214,6 +215,7 @@ SrsHlsMuxer::SrsHlsMuxer()
SrsHlsMuxer::~SrsHlsMuxer()
{
srs_freep(segments);
srs_freep(current);
srs_freep(req);
srs_freep(async);
@ -757,6 +759,10 @@ srs_error_t SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
// #EXT-X-MEDIA-SEQUENCE:4294967295\n
SrsHlsSegment* first = dynamic_cast<SrsHlsSegment*>(segments->first());
if (first == NULL) {
return srs_error_new(ERROR_HLS_WRITE_FAILED, "segments cast");
}
ss << "#EXT-X-MEDIA-SEQUENCE:" << first->sequence_no << SRS_CONSTS_LF;
// #EXT-X-TARGETDURATION:4294967295\n

View file

@ -532,7 +532,7 @@ srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::strin
}
if ((res_code->to_integer()) != ERROR_SUCCESS) {
return srs_error_new(ERROR_RESPONSE_CODE, "http: response object code %d %s", res_code->to_integer(), res.c_str());
return srs_error_new(ERROR_RESPONSE_CODE, "http: response object code %" PRId64 " %s", res_code->to_integer(), res.c_str());
}
return err;

View file

@ -134,7 +134,7 @@ srs_error_t SrsVodStream::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMe
// send data
if ((err = copy(w, fs, r, (int)left)) != srs_success) {
return srs_error_wrap(err, "read flv=%s size=%d", fullpath.c_str(), left);
return srs_error_wrap(err, "read flv=%s size=%d", fullpath.c_str(), (int)left);
}
return err;
@ -184,7 +184,7 @@ srs_error_t SrsVodStream::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMe
// send data
if ((err = copy(w, fs, r, (int)left)) != srs_success) {
return srs_error_wrap(err, "read mp4=%s size=%d", fullpath.c_str(), left);
return srs_error_wrap(err, "read mp4=%s size=%d", fullpath.c_str(), (int)left);
}
return err;

View file

@ -38,6 +38,7 @@ using namespace std;
SrsIngesterFFMPEG::SrsIngesterFFMPEG()
{
ffmpeg = NULL;
starttime = 0;
}
SrsIngesterFFMPEG::~SrsIngesterFFMPEG()

View file

@ -125,6 +125,7 @@ void SrsUdpListener::set_socket_buffer()
int r0_sndbuf = 0;
if (true) {
socklen_t opt_len = sizeof(default_sndbuf);
// TODO: FIXME: check err
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&default_sndbuf, &opt_len);
if ((r0_sndbuf = setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, sizeof(actual_sndbuf))) < 0) {
@ -132,6 +133,7 @@ void SrsUdpListener::set_socket_buffer()
}
opt_len = sizeof(actual_sndbuf);
// TODO: FIXME: check err
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, &opt_len);
}
@ -142,6 +144,7 @@ void SrsUdpListener::set_socket_buffer()
int r0_rcvbuf = 0;
if (true) {
socklen_t opt_len = sizeof(default_rcvbuf);
// TODO: FIXME: check err
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&default_rcvbuf, &opt_len);
if ((r0_rcvbuf = setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, sizeof(actual_rcvbuf))) < 0) {
@ -149,6 +152,7 @@ void SrsUdpListener::set_socket_buffer()
}
opt_len = sizeof(actual_rcvbuf);
// TODO: FIXME: check err
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, &opt_len);
}
@ -288,6 +292,7 @@ SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd)
lfd = fd;
fromlen = 0;
peer_port = 0;
}
SrsUdpMuxSocket::~SrsUdpMuxSocket()

View file

@ -158,12 +158,14 @@ srs_error_t srs_redirect_output(string from_file, int to_fd)
if ((fd = ::open(from_file.c_str(), flags, mode)) < 0) {
return srs_error_new(ERROR_FORK_OPEN_LOG, "open process %d %s failed", to_fd, from_file.c_str());
}
if (dup2(fd, to_fd) < 0) {
return srs_error_new(ERROR_FORK_DUP2_LOG, "dup2 process %d failed", to_fd);
}
int r0 = dup2(fd, to_fd);
::close(fd);
if (r0 < 0) {
return srs_error_new(ERROR_FORK_DUP2_LOG, "dup2 fd=%d, to=%d, file=%s failed, r0=%d",
fd, to_fd, from_file.c_str(), r0);
}
return err;
}

View file

@ -340,7 +340,7 @@ srs_error_t SrsPublishRecvThread::error_code()
return srs_error_copy(recv_error);
}
void SrsPublishRecvThread::set_cid(std::string v)
void SrsPublishRecvThread::set_cid(SrsContextId v)
{
ncid = v;
}

View file

@ -180,7 +180,7 @@ public:
virtual int64_t nb_msgs();
virtual uint64_t nb_video_frames();
virtual srs_error_t error_code();
virtual void set_cid(std::string v);
virtual void set_cid(SrsContextId v);
virtual SrsContextId get_cid();
public:
virtual srs_error_t start();

View file

@ -170,6 +170,7 @@ srs_error_t SrsAudioEncoder::initialize()
case 24000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_SUPERWIDEBAND));
break;
case 16000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_WIDEBAND));
@ -343,6 +344,10 @@ SrsAudioRecode::SrsAudioRecode(int channels, int samplerate)
{
size_ = 0;
data_ = new char[kPcmBufMax];
dec_ = NULL;
enc_ = NULL;
resample_ = NULL;
}
SrsAudioRecode::~SrsAudioRecode()
@ -395,7 +400,7 @@ srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len,
int decode_len = kPacketBufMax;
static char decode_buffer[kPacketBufMax];
if ((err = dec_->decode(pkt, decode_buffer, decode_len)) != srs_success) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "decode error");
return srs_error_wrap(err, "decode error");
}
if (!resample_) {
@ -419,7 +424,7 @@ srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len,
int resample_len = kFrameBufMax;
static char resample_buffer[kFrameBufMax];
if ((err = resample_->resample(&pcm, resample_buffer, resample_len)) != srs_success) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "resample error");
return srs_error_wrap(err, "resample error");
}
n = 0;
@ -446,7 +451,7 @@ srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len,
pcm.size = size_;
static char encode_buffer[kPacketBufMax];
if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "encode error");
return srs_error_wrap(err, "encode error");
}
memcpy(buf[n], encode_buffer, encode_len);

View file

@ -313,16 +313,17 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid)
_srs_config->subscribe(this);
timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS);
nack_epp = new SrsErrorPithyPrint();
}
SrsRtcPlayStream::~SrsRtcPlayStream()
{
_srs_config->unsubscribe(this);
srs_freep(req_);
srs_freep(trd);
srs_freep(timer_);
srs_freep(req_);
if (true) {
std::map<uint32_t, SrsRtcAudioSendTrack*>::iterator it;
@ -337,6 +338,8 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
srs_freep(it->second);
}
}
srs_freep(nack_epp);
}
srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations)
@ -724,7 +727,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp)
vector<SrsRtpPacket2*> resend_pkts;
vector<uint16_t> sns = rtcp->get_lost_sns();
for(int i = 0; i < sns.size(); ++i) {
for(int i = 0; i < (int)sns.size(); ++i) {
uint16_t seq = sns.at(i);
nack_fetch(resend_pkts, rtcp->get_media_ssrc(), seq);
}
@ -732,8 +735,12 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp)
for (int i = 0; i < (int)resend_pkts.size(); ++i) {
SrsRtpPacket2* pkt = resend_pkts[i];
info.nn_bytes += pkt->nb_bytes();
srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, %d bytes", pkt->header.get_sequence(),
pkt->header.get_ssrc(), pkt->header.get_timestamp(), pkt->nb_bytes());
uint32_t nn = 0;
if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) {
srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(),
pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes());
}
}
// By default, we send packets by sendmmsg.
@ -797,10 +804,11 @@ uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc)
return 0;
}
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session)
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, SrsContextId parent_cid)
{
timer_ = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
parent_cid_ = parent_cid;
is_started = false;
session_ = session;
request_keyframe_ = false;
@ -869,6 +877,12 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti
session_->stat_->nn_publishers++;
// Setup the publish stream in source to enable PLI as such.
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->set_publish_stream(this);
return err;
}
@ -888,16 +902,10 @@ srs_error_t SrsRtcPublishStream::start()
return srs_error_wrap(err, "start timer");
}
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if ((err = source->on_publish()) != srs_success) {
return srs_error_wrap(err, "on publish");
}
source->set_publish_stream(this);
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) {
return srs_error_wrap(err, "on start publish");
@ -1003,7 +1011,8 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
// For NACK simulator, drop packet.
if (nn_simulate_nack_drop) {
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); h.decode(&b);
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
err = h.decode(&b); srs_freep(err); // Ignore any error for simluate drop.
simulate_drop_packet(&h, nb_data);
return err;
}
@ -1043,7 +1052,8 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
char* unprotected_buf = new char[kRtpPacketSize];
if ((err = session_->transport_->unprotect_rtp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); h.decode(&b);
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding.
err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(),
h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos());
@ -1449,19 +1459,7 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id)
SrsRtcConnection::~SrsRtcConnection()
{
srs_freep(timer_);
srs_freep(transport_);
srs_freep(req);
srs_freep(stat_);
srs_freep(pp_address_change);
// Note that we should never delete the sendonly_skt,
// it's just point to the object in peer_addresses_.
map<string, SrsUdpMuxSocket*>::iterator it;
for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) {
SrsUdpMuxSocket* addr = it->second;
srs_freep(addr);
}
// Cleanup publishers.
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
SrsRtcPublishStream* publisher = it->second;
@ -1477,6 +1475,19 @@ SrsRtcConnection::~SrsRtcConnection()
}
players_.clear();
players_ssrc_map_.clear();
// Note that we should never delete the sendonly_skt,
// it's just point to the object in peer_addresses_.
map<string, SrsUdpMuxSocket*>::iterator it;
for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) {
SrsUdpMuxSocket* addr = it->second;
srs_freep(addr);
}
srs_freep(transport_);
srs_freep(req);
srs_freep(stat_);
srs_freep(pp_address_change);
}
SrsSdp* SrsRtcConnection::get_local_sdp()
@ -1548,7 +1559,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot
return srs_error_wrap(err, "publish negotiate");
}
if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc)) != srs_success) {
if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc, remote_sdp.is_unified())) != srs_success) {
return srs_error_wrap(err, "generate local sdp");
}
@ -1611,7 +1622,7 @@ srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_s
++it;
}
if ((err = generate_play_local_sdp(req, local_sdp, stream_desc)) != srs_success) {
if ((err = generate_play_local_sdp(req, local_sdp, stream_desc, remote_sdp.is_unified())) != srs_success) {
return srs_error_wrap(err, "generate local sdp");
}
@ -1622,7 +1633,7 @@ srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_s
return err;
}
srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp)
srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, bool unified_plan, SrsSdp& local_sdp)
{
srs_error_t err = srs_success;
@ -1658,7 +1669,7 @@ srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp)
++it;
}
if ((err = generate_play_local_sdp(req, local_sdp, stream_desc)) != srs_success) {
if ((err = generate_play_local_sdp(req, local_sdp, stream_desc, unified_plan)) != srs_success) {
return srs_error_wrap(err, "generate local sdp");
}
@ -2091,7 +2102,7 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ss
int nb_protected_buf = stream.pos();
// FIXME: Merge nack rtcp into one packets.
if (transport_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
if (transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf) == srs_success) {
// TODO: FIXME: Check error.
sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
}
@ -2569,7 +2580,7 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRequest* req, cons
return err;
}
srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc)
srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan)
{
srs_error_t err = srs_success;
@ -2665,8 +2676,10 @@ srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp
local_media_desc.payload_types_.push_back(payload->generate_media_payload_type());
}
// only need media desc info, not ssrc info;
break;
if(!unified_plan) {
// For PlanB, only need media desc info, not ssrc info;
break;
}
}
return err;
@ -2789,9 +2802,9 @@ srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRequest* req, SrsRtcS
//negotiate video media
std::vector<SrsRtcTrackDescription*> req_video_tracks = req_stream_desc->video_track_descs_;
src_track_descs = source->get_track_desc("video", "h264");
for(int i = 0; i < req_video_tracks.size(); ++i) {
for(int i = 0; i < (int)req_video_tracks.size(); ++i) {
SrsRtcTrackDescription* req_video = req_video_tracks.at(i);
for(int j = 0; j < src_track_descs.size(); ++j) {
for(int j = 0; j < (int)src_track_descs.size(); ++j) {
SrsRtcTrackDescription* src_video = src_track_descs.at(j);
if(req_video->id_ == src_video->id_) {
// FIXME: use source sdp or subscribe sdp? native subscribe may have no sdp
@ -2859,7 +2872,42 @@ srs_error_t SrsRtcConnection::fetch_source_capability(SrsRequest* req, std::map<
return err;
}
srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc)
void video_track_generate_play_offer(SrsRtcTrackDescription* track, SrsSdp& local_sdp)
{
local_sdp.media_descs_.push_back(SrsMediaDesc("video"));
SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back();
local_media_desc.port_ = 9;
local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF";
local_media_desc.rtcp_mux_ = true;
local_media_desc.rtcp_rsize_ = true;
local_media_desc.extmaps_ = track->extmaps_;
local_media_desc.mid_ = track->mid_;
local_sdp.groups_.push_back(local_media_desc.mid_);
if (track->direction_ == "recvonly") {
local_media_desc.recvonly_ = true;
} else if (track->direction_ == "sendonly") {
local_media_desc.sendonly_ = true;
} else if (track->direction_ == "sendrecv") {
local_media_desc.sendrecv_ = true;
} else if (track->direction_ == "inactive_") {
local_media_desc.inactive_ = true;
}
SrsVideoPayload* payload = (SrsVideoPayload*)track->media_;
local_media_desc.payload_types_.push_back(payload->generate_media_payload_type());
if (track->red_) {
SrsRedPayload* red_payload = (SrsRedPayload*)track->red_;
local_media_desc.payload_types_.push_back(red_payload->generate_media_payload_type());
}
}
srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan)
{
srs_error_t err = srs_success;
@ -2948,39 +2996,14 @@ srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& l
for (int i = 0; i < (int)stream_desc->video_track_descs_.size(); ++i) {
SrsRtcTrackDescription* track = stream_desc->video_track_descs_[i];
// for plan b, we only add one m=
if (i == 0) {
local_sdp.media_descs_.push_back(SrsMediaDesc("video"));
SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back();
local_media_desc.port_ = 9;
local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF";
local_media_desc.rtcp_mux_ = true;
local_media_desc.rtcp_rsize_ = true;
local_media_desc.extmaps_ = track->extmaps_;
local_media_desc.mid_ = track->mid_;
local_sdp.groups_.push_back(local_media_desc.mid_);
if (track->direction_ == "recvonly") {
local_media_desc.recvonly_ = true;
} else if (track->direction_ == "sendonly") {
local_media_desc.sendonly_ = true;
} else if (track->direction_ == "sendrecv") {
local_media_desc.sendrecv_ = true;
} else if (track->direction_ == "inactive_") {
local_media_desc.inactive_ = true;
}
SrsVideoPayload* payload = (SrsVideoPayload*)track->media_;
local_media_desc.payload_types_.push_back(payload->generate_media_payload_type());
if (track->red_) {
SrsRedPayload* red_payload = (SrsRedPayload*)track->red_;
local_media_desc.payload_types_.push_back(red_payload->generate_media_payload_type());
if (!unified_plan) {
// for plan b, we only add one m= for video track.
if (i == 0) {
video_track_generate_play_offer(track, local_sdp);
}
} else {
// unified plan SDP, generate a m= for each video track.
video_track_generate_play_offer(track, local_sdp);
}
SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back();
@ -3089,7 +3112,7 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDesc
return err;
}
SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this);
SrsRtcPublishStream* publisher = new SrsRtcPublishStream(this, _srs_context->get_id());
if ((err = publisher->initialize(req, stream_desc)) != srs_success) {
srs_freep(publisher);
return srs_error_wrap(err, "rtc publisher init");
@ -3122,7 +3145,7 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDesc
}
}
for(int i = 0; i < stream_desc->video_track_descs_.size(); ++i) {
for(int i = 0; i < (int)stream_desc->video_track_descs_.size(); ++i) {
SrsRtcTrackDescription* track_desc = stream_desc->video_track_descs_.at(i);
if(publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",

View file

@ -221,6 +221,8 @@ private:
// key: publish_ssrc, value: send track to process rtp/rtcp
std::map<uint32_t, SrsRtcAudioSendTrack*> audio_tracks_;
std::map<uint32_t, SrsRtcVideoSendTrack*> video_tracks_;
// The pithy print for special stage.
SrsErrorPithyPrint* nack_epp;
private:
// For merged-write messages.
int mw_msgs;
@ -271,6 +273,7 @@ private:
class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublishStream
{
private:
SrsContextId parent_cid_;
SrsHourGlass* timer_;
uint64_t nn_audio_frames;
private:
@ -296,7 +299,7 @@ private:
SrsRtpExtensionTypes extension_types_;
bool is_started;
public:
SrsRtcPublishStream(SrsRtcConnection* session);
SrsRtcPublishStream(SrsRtcConnection* session, SrsContextId parent_cid);
virtual ~SrsRtcPublishStream();
public:
srs_error_t initialize(SrsRequest* req, SrsRtcStreamDescription* stream_desc);
@ -439,7 +442,7 @@ public:
srs_error_t add_publisher(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t add_player(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
// server send offer sdp to client, local sdp derivate from source stream desc.
srs_error_t add_player2(SrsRequest* request, SrsSdp& local_sdp);
srs_error_t add_player2(SrsRequest* request, bool unified_plan, SrsSdp& local_sdp);
public:
// Before initialize, user must set the local SDP, which is used to inititlize DTLS.
srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username);
@ -483,13 +486,13 @@ private:
srs_error_t on_binding_request(SrsStunPacket* r);
// publish media capabilitiy negotiate
srs_error_t negotiate_publish_capability(SrsRequest* req, const SrsSdp& remote_sdp, SrsRtcStreamDescription* stream_desc);
srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc);
srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan);
// play media capabilitiy negotiate
//TODO: Use StreamDescription to negotiate and remove first negotiate_play_capability function
srs_error_t negotiate_play_capability(SrsRequest* req, const SrsSdp& remote_sdp, std::map<uint32_t, SrsRtcTrackDescription*>& sub_relations);
srs_error_t negotiate_play_capability(SrsRequest* req, SrsRtcStreamDescription* req_stream_desc, std::map<uint32_t, SrsRtcTrackDescription*>& sub_relations);
srs_error_t fetch_source_capability(SrsRequest* req, std::map<uint32_t, SrsRtcTrackDescription*>& sub_relations);
srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc);
srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc, bool unified_plan);
srs_error_t create_player(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations);
srs_error_t create_publisher(SrsRequest* request, SrsRtcStreamDescription* stream_desc);
};

View file

@ -312,10 +312,12 @@ ISrsDtlsCallback::~ISrsDtlsCallback()
{
}
SrsDtls::SrsDtls(ISrsDtlsCallback* callback)
SrsDtlsImpl::SrsDtlsImpl(ISrsDtlsCallback* callback)
{
dtls_ctx = NULL;
dtls = NULL;
bio_in = NULL;
bio_out = NULL;
callback_ = callback;
handshake_done_for_us = false;
@ -323,17 +325,11 @@ SrsDtls::SrsDtls(ISrsDtlsCallback* callback)
last_outgoing_packet_cache = new uint8_t[kRtpPacketSize];
nn_last_outgoing_packet = 0;
role_ = SrsDtlsRoleServer;
version_ = SrsDtlsVersionAuto;
trd = NULL;
state_ = SrsDtlsStateInit;
}
SrsDtls::~SrsDtls()
SrsDtlsImpl::~SrsDtlsImpl()
{
srs_freep(trd);
if (dtls_ctx) {
SSL_CTX_free(dtls_ctx);
dtls_ctx = NULL;
@ -348,15 +344,10 @@ SrsDtls::~SrsDtls()
srs_freepa(last_outgoing_packet_cache);
}
srs_error_t SrsDtls::initialize(std::string role, std::string version)
srs_error_t SrsDtlsImpl::initialize(std::string version)
{
srs_error_t err = srs_success;
role_ = SrsDtlsRoleServer;
if (role == "active") {
role_ = SrsDtlsRoleClient;
}
if (version == "dtls1.0") {
version_ = SrsDtlsVersion1_0;
} else if (version == "dtls1.2") {
@ -371,15 +362,6 @@ srs_error_t SrsDtls::initialize(std::string role, std::string version)
return srs_error_new(ERROR_OpenSslCreateSSL, "SSL_new dtls");
}
if (role_ == SrsDtlsRoleClient) {
// Dtls setup active, as client role.
SSL_set_connect_state(dtls);
SSL_set_max_send_fragment(dtls, kRtpPacketSize);
} else {
// Dtls setup passive, as server role.
SSL_set_accept_state(dtls);
}
if ((bio_in = BIO_new(BIO_s_mem())) == NULL) {
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new in");
}
@ -394,29 +376,10 @@ srs_error_t SrsDtls::initialize(std::string role, std::string version)
return err;
}
srs_error_t SrsDtls::start_active_handshake()
srs_error_t SrsDtlsImpl::on_dtls(char* data, int nb_data)
{
srs_error_t err = srs_success;
if (role_ == SrsDtlsRoleClient) {
return do_handshake();
}
return err;
}
srs_error_t SrsDtls::on_dtls(char* data, int nb_data)
{
srs_error_t err = srs_success;
// When got packet, stop the ARQ if server in the first ARQ state SrsDtlsStateServerHello.
// @note But for ARQ state, we should never stop the ARQ, for example, we are in the second ARQ sate
// SrsDtlsStateServerDone, but we got previous late wrong packet ServeHello, which is not the expect
// packet SessionNewTicket, we should never stop the ARQ thread.
if (role_ == SrsDtlsRoleClient && state_ == SrsDtlsStateServerHello) {
stop_arq();
}
if ((err = do_on_dtls(data, nb_data)) != srs_success) {
return srs_error_wrap(err, "on_dtls size=%u, data=[%s]", nb_data,
srs_string_dumps_hex(data, nb_data, 32).c_str());
@ -425,7 +388,7 @@ srs_error_t SrsDtls::on_dtls(char* data, int nb_data)
return err;
}
srs_error_t SrsDtls::do_on_dtls(char* data, int nb_data)
srs_error_t SrsDtlsImpl::do_on_dtls(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -468,7 +431,7 @@ srs_error_t SrsDtls::do_on_dtls(char* data, int nb_data)
return err;
}
srs_error_t SrsDtls::do_handshake()
srs_error_t SrsDtlsImpl::do_handshake()
{
srs_error_t err = srs_success;
@ -490,16 +453,9 @@ srs_error_t SrsDtls::do_handshake()
uint8_t* data = NULL;
int size = BIO_get_mem_data(bio_out, &data);
// If outgoing packet is empty, we use the last cache.
// @remark Only for DTLS server, because DTLS client use ARQ thread to send cached packet.
// Callback when got SSL original data.
bool cache = false;
if (role_ != SrsDtlsRoleClient && size <= 0 && nn_last_outgoing_packet) {
size = nn_last_outgoing_packet;
data = last_outgoing_packet_cache;
cache = true;
}
// Trace the detail of DTLS packet.
on_ssl_out_data(data, size, cache);
state_trace((uint8_t*)data, size, false, r0, r1, cache, false);
// Update the packet cache.
@ -508,29 +464,9 @@ srs_error_t SrsDtls::do_handshake()
nn_last_outgoing_packet = size;
}
// Driven ARQ and state for DTLS client.
if (role_ == SrsDtlsRoleClient) {
// If we are sending client hello, change from init to new state.
if (state_ == SrsDtlsStateInit && size > 14 && data[13] == 1) {
state_ = SrsDtlsStateClientHello;
}
// If we are sending certificate, change from SrsDtlsStateServerHello to new state.
if (state_ == SrsDtlsStateServerHello && size > 14 && data[13] == 11) {
state_ = SrsDtlsStateClientCertificate;
}
// Try to start the ARQ for client.
if ((state_ == SrsDtlsStateClientHello || state_ == SrsDtlsStateClientCertificate)) {
if (state_ == SrsDtlsStateClientHello) {
state_ = SrsDtlsStateServerHello;
} else if (state_ == SrsDtlsStateClientCertificate) {
state_ = SrsDtlsStateServerDone;
}
if ((err = start_arq()) != srs_success) {
return srs_error_wrap(err, "start arq");
}
}
// Callback for the final output data, before send-out.
if ((err = on_final_out_data(data, size)) != srs_success) {
return srs_error_wrap(err, "handle");
}
if (size > 0 && (err = callback_->write_dtls_data(data, size)) != srs_success) {
@ -539,30 +475,213 @@ srs_error_t SrsDtls::do_handshake()
}
if (handshake_done_for_us) {
// When handshake done, stop the ARQ.
if (role_ == SrsDtlsRoleClient) {
state_ = SrsDtlsStateClientDone;
stop_arq();
}
// Notify connection the DTLS is done.
if (((err = callback_->on_dtls_handshake_done()) != srs_success)) {
return srs_error_wrap(err, "dtls done");
if (((err = on_handshake_done()) != srs_success)) {
return srs_error_wrap(err, "done");
}
}
return err;
}
srs_error_t SrsDtls::cycle()
void SrsDtlsImpl::state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq)
{
uint8_t content_type = 0;
if (length >= 1) {
content_type = (uint8_t)data[0];
}
uint16_t size = 0;
if (length >= 13) {
size = uint16_t(data[11])<<8 | uint16_t(data[12]);
}
uint8_t handshake_type = 0;
if (length >= 14) {
handshake_type = (uint8_t)data[13];
}
srs_trace("DTLS: %s %s, done=%u, cache=%u, arq=%u, r0=%d, r1=%d, len=%u, cnt=%u, size=%u, hs=%u",
(is_dtls_client()? "Active":"Passive"), (incoming? "RECV":"SEND"), handshake_done_for_us, cache, arq,
r0, r1, length, content_type, size, handshake_type);
}
const int SRTP_MASTER_KEY_KEY_LEN = 16;
const int SRTP_MASTER_KEY_SALT_LEN = 14;
srs_error_t SrsDtlsImpl::get_srtp_key(std::string& recv_key, std::string& send_key)
{
srs_error_t err = srs_success;
unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server
static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL export key r0=%lu", ERR_get_error());
}
size_t offset = 0;
std::string client_master_key(reinterpret_cast<char*>(material), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string server_master_key(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string client_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
offset += SRTP_MASTER_KEY_SALT_LEN;
std::string server_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
if (is_dtls_client()) {
recv_key = server_master_key + server_master_salt;
send_key = client_master_key + client_master_salt;
} else {
recv_key = client_master_key + client_master_salt;
send_key = server_master_key + server_master_salt;
}
return err;
}
SrsDtlsClientImpl::SrsDtlsClientImpl(ISrsDtlsCallback* callback) : SrsDtlsImpl(callback)
{
trd = NULL;
state_ = SrsDtlsStateInit;
arq_first = 50 * SRS_UTIME_MILLISECONDS;
arq_interval = 100 * SRS_UTIME_MILLISECONDS;
}
SrsDtlsClientImpl::~SrsDtlsClientImpl()
{
srs_freep(trd);
}
srs_error_t SrsDtlsClientImpl::initialize(std::string version)
{
srs_error_t err = srs_success;
if ((err = SrsDtlsImpl::initialize(version)) != srs_success) {
return err;
}
// Dtls setup active, as client role.
SSL_set_connect_state(dtls);
SSL_set_max_send_fragment(dtls, kRtpPacketSize);
return err;
}
srs_error_t SrsDtlsClientImpl::start_active_handshake()
{
return do_handshake();
}
srs_error_t SrsDtlsClientImpl::on_dtls(char* data, int nb_data)
{
srs_error_t err = srs_success;
// When got packet, stop the ARQ if server in the first ARQ state SrsDtlsStateServerHello.
// @note But for ARQ state, we should never stop the ARQ, for example, we are in the second ARQ sate
// SrsDtlsStateServerDone, but we got previous late wrong packet ServeHello, which is not the expect
// packet SessionNewTicket, we should never stop the ARQ thread.
if (state_ == SrsDtlsStateServerHello) {
stop_arq();
}
if ((err = SrsDtlsImpl::on_dtls(data, nb_data)) != srs_success) {
return err;
}
return err;
}
void SrsDtlsClientImpl::on_ssl_out_data(uint8_t*& data, int& size, bool& cached)
{
// DTLS client use ARQ thread to send cached packet.
cached = false;
}
srs_error_t SrsDtlsClientImpl::on_final_out_data(uint8_t* data, int size)
{
srs_error_t err = srs_success;
// Driven ARQ and state for DTLS client.
// If we are sending client hello, change from init to new state.
if (state_ == SrsDtlsStateInit && size > 14 && data[13] == 1) {
state_ = SrsDtlsStateClientHello;
}
// If we are sending certificate, change from SrsDtlsStateServerHello to new state.
if (state_ == SrsDtlsStateServerHello && size > 14 && data[13] == 11) {
state_ = SrsDtlsStateClientCertificate;
}
// Try to start the ARQ for client.
if ((state_ == SrsDtlsStateClientHello || state_ == SrsDtlsStateClientCertificate)) {
if (state_ == SrsDtlsStateClientHello) {
state_ = SrsDtlsStateServerHello;
} else if (state_ == SrsDtlsStateClientCertificate) {
state_ = SrsDtlsStateServerDone;
}
if ((err = start_arq()) != srs_success) {
return srs_error_wrap(err, "start arq");
}
}
return err;
}
srs_error_t SrsDtlsClientImpl::on_handshake_done()
{
srs_error_t err = srs_success;
// When handshake done, stop the ARQ.
state_ = SrsDtlsStateClientDone;
stop_arq();
// Notify connection the DTLS is done.
if (((err = callback_->on_dtls_handshake_done()) != srs_success)) {
return srs_error_wrap(err, "dtls done");
}
return err;
}
bool SrsDtlsClientImpl::is_dtls_client()
{
return true;
}
srs_error_t SrsDtlsClientImpl::start_arq()
{
srs_error_t err = srs_success;
srs_info("start arq, state=%u", state_);
// Dispose the previous ARQ thread.
srs_freep(trd);
trd = new SrsSTCoroutine("dtls", this, _srs_context->get_id());
// We should start the ARQ thread for DTLS client.
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "arq start");
}
return err;
}
void SrsDtlsClientImpl::stop_arq()
{
srs_info("stop arq, state=%u", state_);
srs_freep(trd);
srs_info("stop arq, done");
}
srs_error_t SrsDtlsClientImpl::cycle()
{
srs_error_t err = srs_success;
// The first ARQ delay.
srs_usleep(50 * SRS_UTIME_MILLISECONDS);
srs_usleep(arq_first);
while (true) {
srs_info("arq cycle, state=%u", state_);
// Limit the max retry for ARQ.
for (int arq_retry_left = 7; arq_retry_left > 0; arq_retry_left--) {
srs_info("arq cycle, state=%u, retry=%d", state_, arq_retry_left);
// We ignore any error for ARQ thread.
if ((err = trd->pull()) != srs_success) {
@ -595,96 +714,110 @@ srs_error_t SrsDtls::cycle()
}
// TODO: Use ARQ step timeouts.
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
srs_usleep(arq_interval);
}
return err;
}
void SrsDtls::state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq)
SrsDtlsServerImpl::SrsDtlsServerImpl(ISrsDtlsCallback* callback) : SrsDtlsImpl(callback)
{
uint8_t content_type = 0;
if (length >= 1) {
content_type = (uint8_t)data[0];
}
uint16_t size = 0;
if (length >= 13) {
size = uint16_t(data[11])<<8 | uint16_t(data[12]);
}
uint8_t handshake_type = 0;
if (length >= 14) {
handshake_type = (uint8_t)data[13];
}
srs_trace("DTLS: %s %s, done=%u, cache=%u, arq=%u, state=%u, r0=%d, r1=%d, len=%u, cnt=%u, size=%u, hs=%u",
(role_ == SrsDtlsRoleClient? "Active":"Passive"), (incoming? "RECV":"SEND"), handshake_done_for_us, cache, arq,
state_, r0, r1, length, content_type, size, handshake_type);
}
srs_error_t SrsDtls::start_arq()
SrsDtlsServerImpl::~SrsDtlsServerImpl()
{
}
srs_error_t SrsDtlsServerImpl::initialize(std::string version)
{
srs_error_t err = srs_success;
if (role_ != SrsDtlsRoleClient) {
if ((err = SrsDtlsImpl::initialize(version)) != srs_success) {
return err;
}
srs_info("start arq, state=%u", state_);
// Dispose the previous ARQ thread.
srs_freep(trd);
trd = new SrsSTCoroutine("dtls", this, _srs_context->get_id());
// We should start the ARQ thread for DTLS client.
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "arq start");
}
// Dtls setup passive, as server role.
SSL_set_accept_state(dtls);
return err;
}
void SrsDtls::stop_arq()
srs_error_t SrsDtlsServerImpl::start_active_handshake()
{
srs_info("stop arq, state=%u", state_);
srs_freep(trd);
srs_info("stop arq, done");
return srs_success;
}
const int SRTP_MASTER_KEY_KEY_LEN = 16;
const int SRTP_MASTER_KEY_SALT_LEN = 14;
srs_error_t SrsDtls::get_srtp_key(std::string& recv_key, std::string& send_key)
void SrsDtlsServerImpl::on_ssl_out_data(uint8_t*& data, int& size, bool& cached)
{
// If outgoing packet is empty, we use the last cache.
// @remark Only for DTLS server, because DTLS client use ARQ thread to send cached packet.
if (size <= 0 && nn_last_outgoing_packet) {
size = nn_last_outgoing_packet;
data = last_outgoing_packet_cache;
cached = true;
}
}
srs_error_t SrsDtlsServerImpl::on_final_out_data(uint8_t* data, int size)
{
return srs_success;
}
srs_error_t SrsDtlsServerImpl::on_handshake_done()
{
srs_error_t err = srs_success;
unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server
static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL export key r0=%u", ERR_get_error());
}
size_t offset = 0;
std::string client_master_key(reinterpret_cast<char*>(material), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string server_master_key(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string client_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
offset += SRTP_MASTER_KEY_SALT_LEN;
std::string server_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
if (role_ == SrsDtlsRoleClient) {
recv_key = server_master_key + server_master_salt;
send_key = client_master_key + client_master_salt;
} else {
recv_key = client_master_key + client_master_salt;
send_key = server_master_key + server_master_salt;
// Notify connection the DTLS is done.
if (((err = callback_->on_dtls_handshake_done()) != srs_success)) {
return srs_error_wrap(err, "dtls done");
}
return err;
}
bool SrsDtlsServerImpl::is_dtls_client()
{
return false;
}
SrsDtls::SrsDtls(ISrsDtlsCallback* callback)
{
callback_ = callback;
impl = new SrsDtlsServerImpl(callback);
}
SrsDtls::~SrsDtls()
{
srs_freep(impl);
}
srs_error_t SrsDtls::initialize(std::string role, std::string version)
{
srs_freep(impl);
if (role == "active") {
impl = new SrsDtlsClientImpl(callback_);
} else {
impl = new SrsDtlsServerImpl(callback_);
}
return impl->initialize(version);
}
srs_error_t SrsDtls::start_active_handshake()
{
return impl->start_active_handshake();
}
srs_error_t SrsDtls::on_dtls(char* data, int nb_data)
{
return impl->on_dtls(data, nb_data);
}
srs_error_t SrsDtls::get_srtp_key(std::string& recv_key, std::string& send_key)
{
return impl->get_srtp_key(recv_key, send_key);
}
SrsSRTP::SrsSRTP()
{
recv_ctx_ = NULL;

View file

@ -105,31 +105,93 @@ enum SrsDtlsState {
SrsDtlsStateClientDone, // Done.
};
class SrsDtls : public ISrsCoroutineHandler
class SrsDtlsImpl
{
private:
protected:
SSL_CTX* dtls_ctx;
SSL* dtls;
BIO* bio_in;
BIO* bio_out;
ISrsDtlsCallback* callback_;
private:
// @remark: dtls_version_ default value is SrsDtlsVersionAuto.
SrsDtlsVersion version_;
protected:
// Whether the handhshake is done, for us only.
// @remark For us only, means peer maybe not done, we also need to handle the DTLS packet.
bool handshake_done_for_us;
// DTLS packet cache, only last out-going packet.
uint8_t* last_outgoing_packet_cache;
int nn_last_outgoing_packet;
public:
SrsDtlsImpl(ISrsDtlsCallback* callback);
virtual ~SrsDtlsImpl();
public:
virtual srs_error_t initialize(std::string version);
virtual srs_error_t start_active_handshake() = 0;
virtual srs_error_t on_dtls(char* data, int nb_data);
protected:
srs_error_t do_on_dtls(char* data, int nb_data);
srs_error_t do_handshake();
void state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq);
public:
srs_error_t get_srtp_key(std::string& recv_key, std::string& send_key);
protected:
virtual void on_ssl_out_data(uint8_t*& data, int& size, bool& cached) = 0;
virtual srs_error_t on_final_out_data(uint8_t* data, int size) = 0;
virtual srs_error_t on_handshake_done() = 0;
virtual bool is_dtls_client() = 0;
};
class SrsDtlsClientImpl : virtual public SrsDtlsImpl, virtual public ISrsCoroutineHandler
{
private:
// ARQ thread, for role active(DTLS client).
// @note If passive(DTLS server), the ARQ is driven by DTLS client.
SrsCoroutine* trd;
// The DTLS-client state to drive the ARQ thread.
SrsDtlsState state_;
// The timeout for ARQ.
srs_utime_t arq_first;
srs_utime_t arq_interval;
public:
SrsDtlsClientImpl(ISrsDtlsCallback* callback);
virtual ~SrsDtlsClientImpl();
public:
virtual srs_error_t initialize(std::string version);
virtual srs_error_t start_active_handshake();
virtual srs_error_t on_dtls(char* data, int nb_data);
protected:
virtual void on_ssl_out_data(uint8_t*& data, int& size, bool& cached);
virtual srs_error_t on_final_out_data(uint8_t* data, int size);
virtual srs_error_t on_handshake_done();
virtual bool is_dtls_client();
private:
// @remark: dtls_role_ default value is DTLS_SERVER.
SrsDtlsRole role_;
// @remark: dtls_version_ default value is SrsDtlsVersionAuto.
SrsDtlsVersion version_;
srs_error_t start_arq();
void stop_arq();
public:
virtual srs_error_t cycle();
};
class SrsDtlsServerImpl : public SrsDtlsImpl
{
public:
SrsDtlsServerImpl(ISrsDtlsCallback* callback);
virtual ~SrsDtlsServerImpl();
public:
virtual srs_error_t initialize(std::string version);
virtual srs_error_t start_active_handshake();
protected:
virtual void on_ssl_out_data(uint8_t*& data, int& size, bool& cached);
virtual srs_error_t on_final_out_data(uint8_t* data, int size);
virtual srs_error_t on_handshake_done();
virtual bool is_dtls_client();
};
class SrsDtls
{
private:
SrsDtlsImpl* impl;
ISrsDtlsCallback* callback_;
public:
SrsDtls(ISrsDtlsCallback* callback);
virtual ~SrsDtls();
@ -141,17 +203,6 @@ public:
// When got DTLS packet, may handshake packets or application data.
// @remark When we are passive(DTLS server), we start handshake when got DTLS packet.
srs_error_t on_dtls(char* data, int nb_data);
private:
srs_error_t do_on_dtls(char* data, int nb_data);
srs_error_t do_handshake();
// interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
private:
void state_trace(uint8_t* data, int length, bool incoming, int r0, int r1, bool cache, bool arq);
private:
srs_error_t start_arq();
void stop_arq();
public:
srs_error_t get_srtp_key(std::string& recv_key, std::string& send_key);
};

View file

@ -189,6 +189,7 @@ SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue
rtp_ = rtp;
pre_check_time_ = 0;
last_remove_packet_time_ = -1;
rtt_ = 0;
srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64,
max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval);

View file

@ -254,6 +254,7 @@ srs_error_t SrsSSRCGroup::encode(std::ostringstream& os)
SrsMediaPayloadType::SrsMediaPayloadType(int payload_type)
{
payload_type_ = payload_type;
clock_rate_ = 0;
}
SrsMediaPayloadType::~SrsMediaPayloadType()
@ -288,7 +289,9 @@ SrsMediaDesc::SrsMediaDesc(const std::string& type)
{
type_ = type;
port_ = 0;
rtcp_mux_ = false;
rtcp_rsize_ = false;
sendrecv_ = false;
recvonly_ = false;
@ -315,7 +318,7 @@ vector<SrsMediaPayloadType> SrsMediaDesc::find_media_with_encoding_name(const st
{
std::vector<SrsMediaPayloadType> payloads;
std::string lower_name, upper_name;
std::string lower_name(encoding_name), upper_name(encoding_name);
transform(encoding_name.begin(), encoding_name.end(), lower_name.begin(), ::tolower);
transform(encoding_name.begin(), encoding_name.end(), upper_name.begin(), ::toupper);
@ -1062,3 +1065,10 @@ srs_error_t SrsSdp::parse_media_description(const std::string& content)
return err;
}
bool SrsSdp::is_unified() const
{
// TODO: FIXME: Maybe we should consider other situations.
return media_descs_.size() > 2;
}

View file

@ -246,6 +246,8 @@ public:
// m-line, media sessions
std::vector<SrsMediaDesc> media_descs_;
bool is_unified() const;
};
#endif

View file

@ -513,7 +513,7 @@ srs_error_t SrsRtcServer::do_create_session(
return err;
}
srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, SrsRtcConnection** psession)
srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession)
{
srs_error_t err = srs_success;
@ -525,7 +525,7 @@ srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, co
SrsRtcConnection* session = new SrsRtcConnection(this, cid);
// first add player for negotiate local sdp media info
if ((err = session->add_player2(req, local_sdp)) != srs_success) {
if ((err = session->add_player2(req, unified_plan, local_sdp)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "add player2");
}

View file

@ -126,7 +126,7 @@ private:
);
public:
// We start offering, create_session2 to generate offer, setup_session2 to handle answer.
srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, SrsRtcConnection** psession);
srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession);
srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp);
// Destroy the session from server.
void destroy(SrsRtcConnection* session);

View file

@ -41,6 +41,7 @@
#include <srs_app_rtc_conn.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_pithy_print.hpp>
#ifdef SRS_FFMPEG_FIT
#include <srs_app_rtc_codec.hpp>
@ -539,6 +540,8 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
// audio track description
if (true) {
SrsRtcTrackDescription* audio_track_desc = new SrsRtcTrackDescription();
SrsAutoFree(SrsRtcTrackDescription, audio_track_desc);
audio_track_desc->type_ = "audio";
audio_track_desc->id_ = "audio-" + srs_random_str(8);
@ -553,6 +556,8 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
// video track description
if (true) {
SrsRtcTrackDescription* video_track_desc = new SrsRtcTrackDescription();
SrsAutoFree(SrsRtcTrackDescription, video_track_desc);
video_track_desc->type_ = "video";
video_track_desc->id_ = "video-" + srs_random_str(8);
@ -1117,6 +1122,8 @@ void SrsRtcDummyBridger::on_unpublish()
SrsCodecPayload::SrsCodecPayload()
{
pt_ = 0;
sample_ = 0;
}
SrsCodecPayload::SrsCodecPayload(uint8_t pt, std::string encode_name, int sample)
@ -1249,7 +1256,7 @@ srs_error_t SrsVideoPayload::set_h264_param_desc(std::string fmtp)
SrsAudioPayload::SrsAudioPayload()
{
type_ = "audio";
channel_ = 0;
}
SrsAudioPayload::SrsAudioPayload(uint8_t pt, std::string encode_name, int sample, int channel)
@ -1331,6 +1338,7 @@ srs_error_t SrsAudioPayload::set_opus_param_desc(std::string fmtp)
SrsRedPayload::SrsRedPayload()
{
channel_ = 0;
}
SrsRedPayload::SrsRedPayload(uint8_t pt, std::string encode_name, int sample, int channel)
@ -1604,6 +1612,8 @@ SrsRtcRecvTrack::SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescripti
rtp_queue_ = new SrsRtpRingBuffer(1000);
nack_receiver_ = new SrsRtpNackForReceiver(rtp_queue_, 1000 * 2 / 3);
}
last_sender_report_sys_time = 0;
}
SrsRtcRecvTrack::~SrsRtcRecvTrack()
@ -1804,6 +1814,8 @@ SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescripti
} else {
rtp_queue_ = new SrsRtpRingBuffer(1000);
}
nack_epp = new SrsErrorPithyPrint();
}
SrsRtcSendTrack::~SrsRtcSendTrack()
@ -1811,6 +1823,7 @@ SrsRtcSendTrack::~SrsRtcSendTrack()
srs_freep(rtp_queue_);
srs_freep(track_desc_);
srs_freep(statistic_);
srs_freep(nack_epp);
}
bool SrsRtcSendTrack::has_ssrc(uint32_t ssrc)
@ -1827,12 +1840,18 @@ SrsRtpPacket2* SrsRtcSendTrack::fetch_rtp_packet(uint16_t seq)
}
// For NACK, it sequence must match exactly, or it cause SRTP fail.
if (pkt->header.get_sequence() != seq) {
srs_trace("miss match seq=%u, pkt seq=%u", seq, pkt->header.get_sequence());
return NULL;
// Return packet only when sequence is equal.
if (pkt->header.get_sequence() == seq) {
return pkt;
}
return pkt;
// Ignore if sequence not match.
uint32_t nn = 0;
if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) {
srs_trace("RTC NACK miss seq=%u, require_seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", seq, pkt->header.get_sequence(),
pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes());
}
return NULL;
}
// TODO: FIXME: Should refine logs, set tracks in a time.

View file

@ -55,6 +55,7 @@ class SrsRtpRingBuffer;
class SrsRtpNackForReceiver;
class SrsJsonObject;
class SrsRtcPlayStreamStatistic;
class SrsErrorPithyPrint;
class SrsNtp
{
@ -308,6 +309,12 @@ class SrsAudioPayload : public SrsCodecPayload
int minptime;
bool use_inband_fec;
bool usedtx;
SrsOpusParameter() {
minptime = 0;
use_inband_fec = false;
usedtx = false;
}
};
public:
@ -524,11 +531,14 @@ protected:
// send track description
SrsRtcTrackDescription* track_desc_;
SrsRtcTrackStatistic* statistic_;
protected:
// The owner connection for this track.
SrsRtcConnection* session_;
// NACK ARQ ring buffer.
SrsRtpRingBuffer* rtp_queue_;
private:
// The pithy print for special stage.
SrsErrorPithyPrint* nack_epp;
public:
SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio);
virtual ~SrsRtcSendTrack();

View file

@ -121,6 +121,9 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port) :
send_min_interval = 0;
tcp_nodelay = false;
info = new SrsClientInfo();
publish_1stpkt_timeout = 0;
publish_normal_timeout = 0;
_srs_config->subscribe(this);
}

View file

@ -193,6 +193,11 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o)
skt = new SrsStSocket();
rtsp = new SrsRtspStack(skt);
trd = new SrsSTCoroutine("rtsp", this);
audio_id = 0;
video_id = 0;
audio_sample_rate = 0;
audio_channel = 0;
req = NULL;
sdk = NULL;
@ -223,6 +228,9 @@ SrsRtspConn::~SrsRtspConn()
srs_freep(vjitter);
srs_freep(ajitter);
srs_freep(avc);
srs_freep(aac);
srs_freep(acodec);
srs_freep(acache);
}

View file

@ -380,7 +380,7 @@ SrsSignalManager::SrsSignalManager(SrsServer* s)
server = s;
sig_pipe[0] = sig_pipe[1] = -1;
trd = new SrsSTCoroutine("signal", this);
trd = new SrsSTCoroutine("signal", this, _srs_context->get_id());
signal_read_stfd = NULL;
}
@ -539,7 +539,7 @@ srs_error_t SrsInotifyWorker::start()
}
if (((err = srs_fd_closeexec(fd))) != srs_success) {
return srs_error_new(ERROR_INOTIFY_OPENFD, "closeexec fd=%d", fd);
return srs_error_wrap(err, "closeexec fd=%d", fd);
}
// /* the following are legal, implemented events that user-space can watch for */
@ -877,7 +877,7 @@ srs_error_t SrsServer::acquire_pid_file()
// write the pid
string pid = srs_int2str(getpid());
if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) {
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%d to file=%s", pid.c_str(), pid_file.c_str());
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str());
}
// auto close when fork child process.

View file

@ -940,6 +940,9 @@ SrsNetworkRtmpServer::SrsNetworkRtmpServer()
nb_conn_sys = nb_conn_srs = 0;
nb_conn_sys_et = nb_conn_sys_tw = 0;
nb_conn_sys_udp = 0;
rkbps = skbps = 0;
rkbps_30s = skbps_30s = 0;
rkbps_5m = skbps_5m = 0;
}
static SrsNetworkRtmpServer _srs_network_rtmp_server;
@ -1298,24 +1301,25 @@ string srs_string_dumps_hex(const char* str, int length, int limit)
string srs_string_dumps_hex(const char* str, int length, int limit, char seperator, int line_limit, char newline)
{
const int LIMIT = 1024*16;
// 1 byte trailing '\0'.
const int LIMIT = 1024*16 + 1;
static char buf[LIMIT];
int len = 0;
for (int i = 0; i < length && i < limit && i < LIMIT; ++i) {
for (int i = 0; i < length && i < limit && len < LIMIT; ++i) {
int nb = snprintf(buf + len, LIMIT - len, "%02x", (uint8_t)str[i]);
if (nb < 0 || nb > LIMIT - len) {
if (nb < 0 || nb >= LIMIT - len) {
break;
}
len += nb;
// Only append seperator and newline when not last byte.
if (i < length - 1 && i < limit - 1 && i < LIMIT - 1) {
if (seperator && len < LIMIT) {
if (i < length - 1 && i < limit - 1 && len < LIMIT) {
if (seperator) {
buf[len++] = seperator;
}
if (newline && line_limit && i > 0 && ((i + 1) % line_limit) == 0 && len < LIMIT) {
if (newline && line_limit && i > 0 && ((i + 1) % line_limit) == 0) {
buf[len++] = newline;
}
}
@ -1325,6 +1329,17 @@ string srs_string_dumps_hex(const char* str, int length, int limit, char seperat
if (len <= 0) {
return "";
}
// If overflow, cut the trailing newline.
if (newline && len >= LIMIT - 2 && buf[len - 1] == newline) {
len--;
}
// If overflow, cut the trailing seperator.
if (seperator && len >= LIMIT - 3 && buf[len - 1] == seperator) {
len--;
}
return string(buf, len);
}