1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch 'feature/rtc' into develop

This commit is contained in:
winlin 2020-07-03 09:23:27 +08:00
commit c62479b112
36 changed files with 1583 additions and 1091 deletions

View file

@ -3935,7 +3935,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "bframe" && m != "aac" && m != "stun_timeout" && m != "stun_strict_check"
&& m != "keep_sequence") {
&& m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
@ -4848,7 +4848,7 @@ int SrsConfig::get_rtc_server_reuseport2()
bool SrsConfig::get_rtc_server_merge_nalus()
{
static int DEFAULT = true;
static int DEFAULT = false;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
@ -5020,9 +5020,9 @@ bool SrsConfig::get_rtc_stun_strict_check(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_rtc_keep_sequence(string vhost)
std::string SrsConfig::get_rtc_dtls_role(string vhost)
{
static bool DEFAULT = false;
static std::string DEFAULT = "passive";
SrsConfDirective* conf = get_rtc(vhost);
@ -5030,12 +5030,47 @@ bool SrsConfig::get_rtc_keep_sequence(string vhost)
return DEFAULT;
}
conf = conf->get("keep_sequence");
conf = conf->get("dtls_role");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
return conf->arg0();
}
std::string SrsConfig::get_rtc_dtls_version(string vhost)
{
static std::string DEFAULT = "auto";
SrsConfDirective* conf = get_rtc(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("dtls_version");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return conf->arg0();
}
int SrsConfig::get_rtc_drop_for_pt(string vhost)
{
static int DEFAULT = 0;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("drop_for_pt");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_rtc_nack_enabled(string vhost)
@ -5078,6 +5113,7 @@ bool SrsConfig::get_rtc_twcc_enabled(string vhost)
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
{
srs_assert(root);

View file

@ -544,7 +544,9 @@ public:
bool get_rtc_aac_discard(std::string vhost);
srs_utime_t get_rtc_stun_timeout(std::string vhost);
bool get_rtc_stun_strict_check(std::string vhost);
bool get_rtc_keep_sequence(std::string vhost);
std::string get_rtc_dtls_role(std::string vhost);
std::string get_rtc_dtls_version(std::string vhost);
int get_rtc_drop_for_pt(std::string vhost);
bool get_rtc_nack_enabled(std::string vhost);
bool get_rtc_twcc_enabled(std::string vhost);

View file

@ -1773,9 +1773,8 @@ srs_error_t SrsHttpApi::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessa
SrsHttpMessage* hm = dynamic_cast<SrsHttpMessage*>(r);
srs_assert(hm);
srs_trace("HTTP API %s %s, content-length=%" PRId64 ", chunked=%d/%d",
r->method_str().c_str(), r->url().c_str(), r->content_length(),
hm->is_chunked(), hm->is_infinite_chunked());
srs_trace("HTTP API %s %s, content-length=%" PRId64 ", chunked=%d", r->method_str().c_str(), r->url().c_str(),
r->content_length(), hm->is_chunked());
// use cors server mux to serve http request, which will proxy to mux.
if ((err = cors->serve_http(w, r)) != srs_success) {

View file

@ -44,7 +44,7 @@
// reserved for the end of log data, it must be strlen(LOG_TAIL)
#define LOG_TAIL_SIZE 1
SrsFastLog::SrsFastLog()
SrsFileLog::SrsFileLog()
{
level = SrsLogLevelTrace;
log_data = new char[LOG_MAX_SIZE];
@ -54,7 +54,7 @@ SrsFastLog::SrsFastLog()
utc = false;
}
SrsFastLog::~SrsFastLog()
SrsFileLog::~SrsFileLog()
{
srs_freepa(log_data);
@ -68,7 +68,7 @@ SrsFastLog::~SrsFastLog()
}
}
srs_error_t SrsFastLog::initialize()
srs_error_t SrsFileLog::initialize()
{
if (_srs_config) {
_srs_config->subscribe(this);
@ -81,7 +81,7 @@ srs_error_t SrsFastLog::initialize()
return srs_success;
}
void SrsFastLog::reopen()
void SrsFileLog::reopen()
{
if (fd > 0) {
::close(fd);
@ -94,7 +94,7 @@ void SrsFastLog::reopen()
open_log_file();
}
void SrsFastLog::verbose(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::verbose(const char* tag, const char* context_id, const char* fmt, ...)
{
if (level > SrsLogLevelVerbose) {
return;
@ -114,7 +114,7 @@ void SrsFastLog::verbose(const char* tag, const char* context_id, const char* fm
write_log(fd, log_data, size, SrsLogLevelVerbose);
}
void SrsFastLog::info(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::info(const char* tag, const char* context_id, const char* fmt, ...)
{
if (level > SrsLogLevelInfo) {
return;
@ -134,7 +134,7 @@ void SrsFastLog::info(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelInfo);
}
void SrsFastLog::trace(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::trace(const char* tag, const char* context_id, const char* fmt, ...)
{
if (level > SrsLogLevelTrace) {
return;
@ -154,7 +154,7 @@ void SrsFastLog::trace(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelTrace);
}
void SrsFastLog::warn(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::warn(const char* tag, const char* context_id, const char* fmt, ...)
{
if (level > SrsLogLevelWarn) {
return;
@ -174,7 +174,7 @@ void SrsFastLog::warn(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelWarn);
}
void SrsFastLog::error(const char* tag, const char* context_id, const char* fmt, ...)
void SrsFileLog::error(const char* tag, const char* context_id, const char* fmt, ...)
{
if (level > SrsLogLevelError) {
return;
@ -200,14 +200,14 @@ void SrsFastLog::error(const char* tag, const char* context_id, const char* fmt,
write_log(fd, log_data, size, SrsLogLevelError);
}
srs_error_t SrsFastLog::on_reload_utc_time()
srs_error_t SrsFileLog::on_reload_utc_time()
{
utc = _srs_config->get_utc_time();
return srs_success;
}
srs_error_t SrsFastLog::on_reload_log_tank()
srs_error_t SrsFileLog::on_reload_log_tank()
{
srs_error_t err = srs_success;
@ -234,7 +234,7 @@ srs_error_t SrsFastLog::on_reload_log_tank()
return err;
}
srs_error_t SrsFastLog::on_reload_log_level()
srs_error_t SrsFileLog::on_reload_log_level()
{
srs_error_t err = srs_success;
@ -247,7 +247,7 @@ srs_error_t SrsFastLog::on_reload_log_level()
return err;
}
srs_error_t SrsFastLog::on_reload_log_file()
srs_error_t SrsFileLog::on_reload_log_file()
{
srs_error_t err = srs_success;
@ -267,7 +267,7 @@ srs_error_t SrsFastLog::on_reload_log_file()
return err;
}
void SrsFastLog::write_log(int& fd, char *str_log, int size, int level)
void SrsFileLog::write_log(int& fd, char *str_log, int size, int level)
{
// ensure the tail and EOF of string
// LOG_TAIL_SIZE for the TAIL char.
@ -307,7 +307,7 @@ void SrsFastLog::write_log(int& fd, char *str_log, int size, int level)
}
}
void SrsFastLog::open_log_file()
void SrsFileLog::open_log_file()
{
if (!_srs_config) {
return;

View file

@ -35,7 +35,7 @@
// Use memory/disk cache and donot flush when write log.
// it's ok to use it without config, which will log to console, and default trace level.
// when you want to use different level, override this classs, set the protected _level.
class SrsFastLog : public ISrsLog, public ISrsReloadHandler
class SrsFileLog : public ISrsLog, public ISrsReloadHandler
{
private:
// Defined in SrsLogLevel.
@ -49,8 +49,8 @@ private:
// Whether use utc time.
bool utc;
public:
SrsFastLog();
virtual ~SrsFastLog();
SrsFileLog();
virtual ~SrsFileLog();
// Interface ISrsLog
public:
virtual srs_error_t initialize();

View file

@ -136,16 +136,9 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
string eip = r->query_get("eip");
// For client to specifies whether encrypt by SRTP.
string encrypt = r->query_get("encrypt");
// If keep_sequence is off, for client to specifies the startup sequence.
string sequence_startup = r->query_get("sequence_startup");
// If keep_sequence is on, for client to specifies the delta value for sequence.
string sequence_delta = r->query_get("sequence_delta");
// Whether keep sequence, overwrite the config for debugging each session.
string sequence_keep = r->query_get("sequence_keep");
srs_trace("RTC play %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, encrypt=%s, sequence(startup=%s,delta=%s,keep=%s)",
streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length(), eip.c_str(), encrypt.c_str(),
sequence_startup.c_str(), sequence_delta.c_str(), sequence_keep.c_str());
srs_trace("RTC play %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, encrypt=%s",
streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length(), eip.c_str(), encrypt.c_str());
// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
SrsSdp remote_sdp;
@ -169,6 +162,11 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
}
SrsSdp local_sdp;
// Config for SDP and session.
local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(request.vhost);
local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(request.vhost);
if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
@ -195,11 +193,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
session->set_encrypt(encrypt != "false");
}
// Set the optional parameters from client.
session->sequence_startup = sequence_startup;
session->sequence_delta = sequence_delta;
session->sequence_keep = sequence_keep;
ostringstream os;
if ((err = local_sdp.encode(os)) != srs_success) {
return srs_error_wrap(err, "encode sdp");
@ -369,7 +362,7 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(SrsRequest* req, const SrsSdp& remote_
} else if (remote_media_desc.session_info_.setup_ == "passive") {
local_media_desc.session_info_.setup_ = "active";
} else if (remote_media_desc.session_info_.setup_ == "actpass") {
local_media_desc.session_info_.setup_ = "passive";
local_media_desc.session_info_.setup_ = local_sdp.session_config_.dtls_role;
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange
@ -527,6 +520,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
}
SrsSdp local_sdp;
// Config for SDP and session.
local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(request.vhost);
local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(request.vhost);
if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
@ -635,6 +633,21 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remo
SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back();
// Whether feature enabled in remote extmap.
int remote_twcc_id = 0;
if (true) {
map<int, string> extmaps = remote_media_desc.get_extmaps();
for(map<int, string>::iterator it = extmaps.begin(); it != extmaps.end(); ++it) {
if (it->second == kTWCCExt) {
remote_twcc_id = it->first;
break;
}
}
}
if (twcc_enabled && remote_twcc_id) {
local_media_desc.extmaps_[remote_twcc_id] = kTWCCExt;
}
if (remote_media_desc.is_audio()) {
// TODO: check opus format specific param
std::vector<SrsMediaPayloadType> payloads = remote_media_desc.find_media_with_encoding_name("opus");
@ -651,7 +664,7 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remo
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
}
if (twcc_enabled) {
if (twcc_enabled && remote_twcc_id) {
if (rtcp_fb.at(j) == "transport-cc") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
@ -661,13 +674,6 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remo
// Only choose one match opus codec.
break;
}
map<int, string> extmaps = remote_media_desc.get_extmaps();
for(map<int, string>::iterator it_ext = extmaps.begin(); it_ext != extmaps.end(); ++it_ext) {
if (it_ext->second == kTWCCExt) {
local_media_desc.extmaps_[it_ext->first] = kTWCCExt;
break;
}
}
if (local_media_desc.payload_types_.empty()) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no valid found opus payload type");
@ -700,7 +706,7 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remo
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
}
if (twcc_enabled) {
if (twcc_enabled && remote_twcc_id) {
if (rtcp_fb.at(j) == "transport-cc") {
payload_type.rtcp_fb_.push_back(rtcp_fb.at(j));
}
@ -713,13 +719,6 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remo
backup_payloads.push_back(*iter);
}
map<int, string> extmaps = remote_media_desc.get_extmaps();
for(map<int, string>::iterator it_ext = extmaps.begin(); it_ext != extmaps.end(); ++it_ext) {
if (it_ext->second == kTWCCExt) {
local_media_desc.extmaps_[it_ext->first] = kTWCCExt;
break;
}
}
// Try my best to pick at least one media payload type.
if (local_media_desc.payload_types_.empty() && ! backup_payloads.empty()) {
@ -746,7 +745,7 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remo
} else if (remote_media_desc.session_info_.setup_ == "passive") {
local_media_desc.session_info_.setup_ = "active";
} else if (remote_media_desc.session_info_.setup_ == "actpass") {
local_media_desc.session_info_.setup_ = "passive";
local_media_desc.session_info_.setup_ = local_sdp.session_config_.dtls_role;
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange

View file

@ -43,7 +43,6 @@ using namespace std;
#include <srs_rtc_stun_stack.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_rtmp_msg_array.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_config.hpp>
#include <srs_app_rtc_queue.hpp>
@ -72,9 +71,6 @@ string gen_random_str(int len)
return ret;
}
const int SRTP_MASTER_KEY_KEY_LEN = 16;
const int SRTP_MASTER_KEY_SALT_LEN = 14;
uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32;
SrsNtp::SrsNtp()
@ -111,156 +107,63 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
}
SrsRtcDtls::SrsRtcDtls(SrsRtcSession* s)
SrsSecurityTransport::SrsSecurityTransport(SrsRtcSession* s)
{
session_ = s;
dtls = NULL;
bio_in = NULL;
bio_out = NULL;
client_key = "";
server_key = "";
srtp_send = NULL;
srtp_recv = NULL;
dtls_ = new SrsDtls((ISrsDtlsCallback*)this);
srtp_ = new SrsSRTP();
handshake_done = false;
}
SrsRtcDtls::~SrsRtcDtls()
SrsSecurityTransport::~SrsSecurityTransport()
{
if (dtls) {
// this function will free bio_in and bio_out
SSL_free(dtls);
dtls = NULL;
if (dtls_) {
srs_freep(dtls_);
dtls_ = NULL;
}
if (srtp_send) {
srtp_dealloc(srtp_send);
}
if (srtp_recv) {
srtp_dealloc(srtp_recv);
if (srtp_) {
srs_freep(srtp_);
srtp_ = NULL;
}
}
srs_error_t SrsRtcDtls::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
if ((err = SrsDtls::instance()->init(r)) != srs_success) {
return srs_error_wrap(err, "DTLS init");
}
// TODO: FIXME: Support config by vhost to use RSA or ECDSA certificate.
if ((dtls = SSL_new(SrsDtls::instance()->get_dtls_ctx())) == NULL) {
return srs_error_new(ERROR_OpenSslCreateSSL, "SSL_new dtls");
}
// Dtls setup passive, as server role.
SSL_set_accept_state(dtls);
if ((bio_in = BIO_new(BIO_s_mem())) == NULL) {
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new in");
}
if ((bio_out = BIO_new(BIO_s_mem())) == NULL) {
BIO_free(bio_in);
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new out");
}
SSL_set_bio(dtls, bio_in, bio_out);
return err;
srs_error_t SrsSecurityTransport::initialize(SrsSessionConfig* cfg)
{
return dtls_->initialize(cfg->dtls_role, cfg->dtls_version);
}
srs_error_t SrsRtcDtls::handshake()
srs_error_t SrsSecurityTransport::start_active_handshake()
{
return dtls_->start_active_handshake();
}
srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size)
{
srs_error_t err = srs_success;
int ret = SSL_do_handshake(dtls);
unsigned char *out_bio_data;
int out_bio_len = BIO_get_mem_data(bio_out, &out_bio_data);
int ssl_err = SSL_get_error(dtls, ret);
switch(ssl_err) {
case SSL_ERROR_NONE: {
if ((err = on_dtls_handshake_done()) != srs_success) {
return srs_error_wrap(err, "dtls handshake done handle");
}
break;
}
case SSL_ERROR_WANT_READ: {
break;
}
case SSL_ERROR_WANT_WRITE: {
break;
}
default: {
break;
}
}
if (out_bio_len) {
if ((err = session_->sendonly_skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) {
if (size) {
if ((err = session_->sendonly_skt->sendto(data, size, 0)) != srs_success) {
return srs_error_wrap(err, "send dtls packet");
}
}
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = out_bio_data; int len = out_bio_len; SrsRtcSession* s = session_;
void* p = data; int len = size; SrsRtcSession* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
return err;
}
srs_error_t SrsRtcDtls::on_dtls(char* data, int nb_data)
srs_error_t SrsSecurityTransport::on_dtls(char* data, int nb_data)
{
srs_error_t err = srs_success;
if (BIO_reset(bio_in) != 1) {
return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset");
}
if (BIO_reset(bio_out) != 1) {
return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset");
}
if (BIO_write(bio_in, data, nb_data) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write");
}
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = data; int len = nb_data; SrsRtcSession* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
if (!handshake_done) {
err = handshake();
} else {
while (BIO_ctrl_pending(bio_in) > 0) {
char dtls_read_buf[8092];
int nb = SSL_read(dtls, dtls_read_buf, sizeof(dtls_read_buf));
if (nb > 0) {
if ((err =on_dtls_application_data(dtls_read_buf, nb)) != srs_success) {
return srs_error_wrap(err, "dtls application data process");
}
}
}
}
return err;
return dtls_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcDtls::on_dtls_handshake_done()
srs_error_t SrsSecurityTransport::on_dtls_handshake_done()
{
srs_error_t err = srs_success;
srs_trace("rtc session=%s, DTLS handshake done.", session_->id().c_str());
@ -273,7 +176,7 @@ srs_error_t SrsRtcDtls::on_dtls_handshake_done()
return session_->on_connection_established();
}
srs_error_t SrsRtcDtls::on_dtls_application_data(const char* buf, const int nb_buf)
srs_error_t SrsSecurityTransport::on_dtls_application_data(const char* buf, const int nb_buf)
{
srs_error_t err = srs_success;
@ -282,192 +185,68 @@ srs_error_t SrsRtcDtls::on_dtls_application_data(const char* buf, const int nb_b
return err;
}
srs_error_t SrsRtcDtls::srtp_initialize()
srs_error_t SrsSecurityTransport::srtp_initialize()
{
srs_error_t err = srs_success;
unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server
static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL_export_keying_material failed");
}
size_t offset = 0;
std::string client_master_key(reinterpret_cast<char*>(material), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string server_master_key(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string client_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
offset += SRTP_MASTER_KEY_SALT_LEN;
std::string server_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
client_key = client_master_key + client_master_salt;
server_key = server_master_key + server_master_salt;
if ((err = srtp_send_init()) != srs_success) {
return srs_error_wrap(err, "srtp send init failed");
}
if ((err = srtp_recv_init()) != srs_success) {
return srs_error_wrap(err, "srtp recv init failed");
}
return err;
}
srs_error_t SrsRtcDtls::srtp_send_init()
{
srs_error_t err = srs_success;
srtp_policy_t policy;
bzero(&policy, sizeof(policy));
// TODO: Maybe we can use SRTP-GCM in future.
// @see https://bugs.chromium.org/p/chromium/issues/detail?id=713701
// @see https://groups.google.com/forum/#!topic/discuss-webrtc/PvCbWSetVAQ
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp);
policy.ssrc.type = ssrc_any_outbound;
policy.ssrc.value = 0;
// TODO: adjust window_size
policy.window_size = 8192;
policy.allow_repeat_tx = 1;
policy.next = NULL;
uint8_t *key = new uint8_t[server_key.size()];
memcpy(key, server_key.data(), server_key.size());
policy.key = key;
if (srtp_create(&srtp_send, &policy) != srtp_err_status_ok) {
srs_freepa(key);
return srs_error_new(ERROR_RTC_SRTP_INIT, "srtp_create failed");
}
srs_freepa(key);
return err;
}
srs_error_t SrsRtcDtls::srtp_recv_init()
{
srs_error_t err = srs_success;
srtp_policy_t policy;
bzero(&policy, sizeof(policy));
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp);
policy.ssrc.type = ssrc_any_inbound;
policy.ssrc.value = 0;
// TODO: adjust window_size
policy.window_size = 8192;
policy.allow_repeat_tx = 1;
policy.next = NULL;
uint8_t *key = new uint8_t[client_key.size()];
memcpy(key, client_key.data(), client_key.size());
policy.key = key;
// TODO: FIXME: Wrap error code.
if (srtp_create(&srtp_recv, &policy) != srtp_err_status_ok) {
srs_freepa(key);
return srs_error_new(ERROR_RTC_SRTP_INIT, "srtp_create failed");
}
srs_freepa(key);
return err;
}
srs_error_t SrsRtcDtls::protect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf)
{
srs_error_t err = srs_success;
if (srtp_send) {
memcpy(out_buf, in_buf, nb_out_buf);
// TODO: FIXME: Wrap error code.
if (srtp_protect(srtp_send, out_buf, &nb_out_buf) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed");
}
std::string send_key;
std::string recv_key;
if ((err = dtls_->get_srtp_key(recv_key, send_key)) != srs_success) {
return err;
}
if ((err = srtp_->initialize(recv_key, send_key)) != srs_success) {
return srs_error_wrap(err, "srtp init failed");
}
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed");
return err;
}
srs_error_t SrsSecurityTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher)
{
if (!srtp_) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed");
}
return srtp_->protect_rtp(plaintext, cipher, nb_cipher);
}
srs_error_t SrsSecurityTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher)
{
if (!srtp_) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtcp protect failed");
}
return srtp_->protect_rtcp(plaintext, cipher, nb_cipher);
}
// TODO: FIXME: Merge with protect_rtp.
srs_error_t SrsRtcDtls::protect_rtp2(void* rtp_hdr, int* len_ptr)
srs_error_t SrsSecurityTransport::protect_rtp2(void* rtp_hdr, int* len_ptr)
{
srs_error_t err = srs_success;
if (!srtp_send) {
if (!srtp_) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
}
// TODO: FIXME: Wrap error code.
if (srtp_protect(srtp_send, rtp_hdr, len_ptr) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
}
return err;
return srtp_->protect_rtp2(rtp_hdr, len_ptr);
}
srs_error_t SrsRtcDtls::unprotect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf)
srs_error_t SrsSecurityTransport::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext)
{
srs_error_t err = srs_success;
if (srtp_recv) {
memcpy(out_buf, in_buf, nb_out_buf);
srtp_err_status_t r0 = srtp_unprotect(srtp_recv, out_buf, &nb_out_buf);
if (r0 != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "unprotect r0=%u", r0);
}
return err;
if (!srtp_) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtp unprotect failed");
}
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtp unprotect failed");
return srtp_->unprotect_rtp(cipher, plaintext, nb_plaintext);
}
srs_error_t SrsRtcDtls::protect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf)
srs_error_t SrsSecurityTransport::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext)
{
srs_error_t err = srs_success;
if (srtp_send) {
memcpy(out_buf, in_buf, nb_out_buf);
// TODO: FIXME: Wrap error code.
if (srtp_protect_rtcp(srtp_send, out_buf, &nb_out_buf) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtcp protect failed");
}
return err;
if (!srtp_) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed");
}
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtcp protect failed");
}
srs_error_t SrsRtcDtls::unprotect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf)
{
srs_error_t err = srs_success;
if (srtp_recv) {
memcpy(out_buf, in_buf, nb_out_buf);
// TODO: FIXME: Wrap error code.
if (srtp_unprotect_rtcp(srtp_recv, out_buf, &nb_out_buf) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed");
}
return err;
}
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed");
return srtp_->unprotect_rtcp(cipher, plaintext, nb_plaintext);
}
SrsRtcOutgoingInfo::SrsRtcOutgoingInfo()
@ -494,9 +273,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, string parent_cid)
session_ = s;
audio_sequence = 0;
video_sequence = 0;
sequence_delta = 0;
mw_msgs = 0;
realtime = true;
@ -506,7 +282,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, string parent_cid)
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
keep_sequence_ = false;
_srs_config->subscribe(this);
}
@ -520,7 +295,7 @@ SrsRtcPlayer::~SrsRtcPlayer()
srs_freep(video_queue_);
}
srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt)
srs_error_t SrsRtcPlayer::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt)
{
srs_error_t err = srs_success;
@ -532,18 +307,8 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr
// TODO: FIXME: Support reload.
nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
keep_sequence_ = _srs_config->get_rtc_keep_sequence(session_->req->vhost);
if (!session_->sequence_startup.empty()) {
audio_sequence = video_sequence = uint16_t(::atoi(session_->sequence_startup.c_str()));
}
if (!session_->sequence_delta.empty()) {
sequence_delta = uint16_t(::atoi(session_->sequence_delta.c_str()));
}
if (!session_->sequence_keep.empty()) {
keep_sequence_ = (session_->sequence_keep == "true");
}
srs_trace("RTC player video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), nack=%d, keep-seq=%d, sequence(audio=%u,video=%u,delta=%u)",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, nack_enabled_, keep_sequence_, audio_sequence, video_sequence, sequence_delta);
srs_trace("RTC player video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), nack=%d",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, nack_enabled_);
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_play(session_, this, session_->req)) != srs_success) {
@ -705,7 +470,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcSource* source, const vector<SrsRtp
srs_error_t err = srs_success;
// If DTLS is not OK, drop all messages.
if (!session_->dtls_) {
if (!session_->transport_) {
return err;
}
@ -716,36 +481,22 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcSource* source, const vector<SrsRtp
// Update stats.
info.nn_bytes += pkt->nb_bytes();
uint16_t oseq = pkt->header.get_sequence();
// For audio, we transcoded AAC to opus in extra payloads.
if (pkt->is_audio()) {
info.nn_audios++;
if (!keep_sequence_) {
// TODO: FIXME: Should keep the order by original sequence.
pkt->header.set_sequence(sequence_delta + audio_sequence++);
} else {
pkt->header.set_sequence(sequence_delta + oseq);
}
pkt->header.set_ssrc(audio_ssrc);
pkt->header.set_payload_type(audio_payload_type);
// TODO: FIXME: Padding audio to the max payload in RTP packets.
} else {
info.nn_videos++;
if (!keep_sequence_) {
// TODO: FIXME: Should keep the order by original sequence.
pkt->header.set_sequence(sequence_delta + video_sequence++);
} else {
pkt->header.set_sequence(sequence_delta + oseq);
}
pkt->header.set_ssrc(video_ssrc);
pkt->header.set_payload_type(video_payload_type);
}
// Detail log, should disable it in release version.
srs_info("RTC: Update PT=%u, SSRC=%#x, OSEQ=%u, SEQ=%u, Time=%u, %u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
oseq, pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes());
srs_info("RTC: Update PT=%u, SSRC=%#x, Time=%u, %u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
pkt->header.get_timestamp(), pkt->nb_bytes());
}
// By default, we send packets by sendmmsg.
@ -788,7 +539,7 @@ srs_error_t SrsRtcPlayer::do_send_packets(const std::vector<SrsRtpPacket2*>& pkt
// Whether encrypt the RTP bytes.
if (encrypt) {
int nn_encrypt = (int)iov->iov_len;
if ((err = session_->dtls_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
if ((err = session_->transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
@ -1105,9 +856,10 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
source = NULL;
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
pt_to_drop_ = 0;
nn_audio_frames = 0;
twcc_ext_id_ = 0;
twcc_id_ = 0;
last_twcc_feedback_time_ = 0;
twcc_fb_count_ = 0;
}
@ -1127,24 +879,28 @@ SrsRtcPublisher::~SrsRtcPublisher()
srs_freep(audio_queue_);
}
srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, uint8_t twcc_ext_id, SrsRequest* r)
srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* r)
{
srs_error_t err = srs_success;
video_ssrc = vssrc;
audio_ssrc = assrc;
twcc_ext_id_ = twcc_ext_id;
rtcp_twcc_.set_media_ssrc(video_ssrc);
req = r;
if (twcc_ext_id_ != 0) {
extension_map_.register_by_uri(twcc_ext_id_, kTWCCExt);
}
// TODO: FIXME: Support reload.
nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(session_->req->vhost);
bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost);
if (twcc_enabled) {
twcc_id_ = twcc_id;
}
srs_trace("RTC publisher video(ssrc=%u), audio(ssrc=%u), nack=%d, pt-drop=%u, twcc=%u/%d",
video_ssrc, audio_ssrc, nack_enabled_, pt_to_drop_, twcc_enabled, twcc_id);
srs_trace("RTC publisher video(ssrc=%u), audio(ssrc=%u), nack=%d",
video_ssrc, audio_ssrc, nack_enabled_);
if (twcc_id_) {
extension_types_.register_by_uri(twcc_id_, kTWCCExt);
rtcp_twcc_.set_media_ssrc(video_ssrc);
}
if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "hourglass tick");
@ -1176,7 +932,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, uint8_t
void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc)
{
// If DTLS is not OK, drop all messages.
if (!session_->dtls_) {
if (!session_->transport_) {
return;
}
@ -1214,7 +970,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr
int nb_protected_buf = stream.pos();
// FIXME: Merge nack rtcp into one packets.
if (session_->dtls_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
if (session_->transport_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
// TODO: FIXME: Check error.
session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
}
@ -1228,7 +984,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_q
srs_error_t err = srs_success;
// If DTLS is not OK, drop all messages.
if (!session_->dtls_) {
if (!session_->transport_) {
return err;
}
@ -1272,7 +1028,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_q
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = session_->dtls_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) {
if ((err = session_->transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp rr");
}
@ -1286,7 +1042,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc)
srs_error_t err = srs_success;
// If DTLS is not OK, drop all messages.
if (!session_->dtls_) {
if (!session_->transport_) {
return err;
}
@ -1332,7 +1088,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc)
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = session_->dtls_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) {
if ((err = session_->transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp xr");
}
@ -1347,7 +1103,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc)
srs_error_t err = srs_success;
// If DTLS is not OK, drop all messages.
if (!session_->dtls_) {
if (!session_->transport_) {
return err;
}
@ -1369,7 +1125,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc)
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = session_->dtls_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) {
if ((err = session_->transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp psfb pli");
}
@ -1380,51 +1136,59 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc)
}
srs_error_t SrsRtcPublisher::on_twcc(uint16_t sn) {
srs_error_t err = srs_success;
srs_utime_t now = srs_get_system_time();
rtcp_twcc_.recv_packet(sn, now);
if(0 == last_twcc_feedback_time_) {
last_twcc_feedback_time_ = now;
return err;
}
srs_utime_t diff = now - last_twcc_feedback_time_;
if( diff >= 50 * SRS_UTIME_MILLISECONDS) {
last_twcc_feedback_time_ = now;
char pkt[kRtcpPacketSize];
SrsBuffer *buffer = new SrsBuffer(pkt, sizeof(pkt));
SrsAutoFree(SrsBuffer, buffer);
rtcp_twcc_.set_feedback_count(twcc_fb_count_);
twcc_fb_count_++;
if((err = rtcp_twcc_.encode(buffer)) != srs_success) {
return srs_error_wrap(err, "fail to generate twcc feedback packet");
}
int nb_protected_buf = buffer->pos();
char protected_buf[kRtpPacketSize];
if (session_->dtls_->protect_rtcp(protected_buf, pkt, nb_protected_buf) == srs_success) {
session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
}
}
return err;
return rtcp_twcc_.recv_packet(sn, now);
}
srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data)
{
srs_error_t err = srs_success;
// For NACK simulator, drop packet.
if (nn_simulate_nack_drop) {
SrsBuffer b0(data, nb_data); SrsRtpHeader h0; h0.decode(&b0);
simulate_drop_packet(&h0, nb_data);
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.decode(&b);
simulate_drop_packet(&h, nb_data);
return err;
}
// Decode the header first.
SrsRtpHeader h;
if (pt_to_drop_ && twcc_id_) {
SrsBuffer b(data, nb_data);
h.ignore_padding(true); h.set_extensions(&extension_types_);
if ((err = h.decode(&b)) != srs_success) {
return srs_error_wrap(err, "twcc decode header");
}
}
// We must parse the TWCC from RTP header before SRTP unprotect, because:
// 1. Client may send some padding packets with invalid SequenceNumber, which causes the SRTP fail.
// 2. Server may send multiple duplicated NACK to client, and got more than one ARQ packet, which also fail SRTP.
// so, we must parse the header before SRTP unprotect(which may fail and drop packet).
if (twcc_id_) {
uint16_t twcc_sn = 0;
if ((err = h.get_twcc_sequence_number(twcc_sn)) == srs_success) {
if((err = on_twcc(twcc_sn)) != srs_success) {
return srs_error_wrap(err, "on twcc");
}
} else {
srs_error_reset(err);
}
}
// If payload type is configed to drop, ignore this packet.
if (pt_to_drop_ && pt_to_drop_ == h.get_payload_type()) {
return err;
}
// Decrypt the cipher to plaintext RTP data.
int nb_unprotected_buf = nb_data;
char* unprotected_buf = new char[kRtpPacketSize];
if ((err = session_->dtls_->unprotect_rtp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) {
if ((err = session_->transport_->unprotect_rtp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b0(data, nb_data); SrsRtpHeader h0; h0.decode(&b0);
err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h0.get_marker(), h0.get_payload_type(),
h0.get_sequence(), h0.get_timestamp(), h0.get_ssrc(), h0.get_padding(), nb_data - b0.pos());
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.decode(&b);
err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(),
h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos());
srs_freepa(unprotected_buf);
return err;
@ -1445,7 +1209,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data)
if (true) {
pkt->set_decode_handler(this);
pkt->set_rtp_header_extensions(&extension_map_);
pkt->set_extension_types(&extension_types_);
pkt->shared_msg = new SrsSharedPtrMessage();
pkt->shared_msg->wrap(buf, nb_buf);
@ -1453,18 +1217,6 @@ srs_error_t SrsRtcPublisher::on_rtp(char* data, int nb_data)
if ((err = pkt->decode(&b)) != srs_success) {
return srs_error_wrap(err, "decode rtp packet");
}
if (0 != twcc_ext_id_) {
uint16_t twcc_sn = 0;
if ((err = pkt->header.get_twcc_sequence_number(twcc_sn)) == srs_success) {
if((err = on_twcc(twcc_sn))) {
return srs_error_wrap(err, "fail to process twcc packet");
}
} else {
// TODO: FIXME: process no twcc seq number for audio ssrc
srs_error_reset(err);
}
}
}
// For source to consume packet.
@ -1595,6 +1347,35 @@ srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt)
return err;
}
srs_error_t SrsRtcPublisher::send_periodic_twcc()
{
srs_error_t err = srs_success;
srs_utime_t now = srs_get_system_time();
if(0 == last_twcc_feedback_time_) {
last_twcc_feedback_time_ = now;
return err;
}
srs_utime_t diff = now - last_twcc_feedback_time_;
if( diff >= 50 * SRS_UTIME_MILLISECONDS) {
last_twcc_feedback_time_ = now;
char pkt[kRtcpPacketSize];
SrsBuffer *buffer = new SrsBuffer(pkt, sizeof(pkt));
SrsAutoFree(SrsBuffer, buffer);
rtcp_twcc_.set_feedback_count(twcc_fb_count_);
twcc_fb_count_++;
if((err = rtcp_twcc_.encode(buffer)) != srs_success) {
return srs_error_wrap(err, "fail to generate twcc feedback packet");
}
int nb_protected_buf = buffer->pos();
char protected_buf[kRtpPacketSize];
if (session_->transport_->protect_rtcp(pkt, protected_buf, nb_protected_buf) == srs_success) {
session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
}
}
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -1949,11 +1730,17 @@ void SrsRtcPublisher::request_keyframe()
srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
// TODO: FIXME: Check error.
send_rtcp_rr(video_ssrc, video_queue_);
send_rtcp_rr(audio_ssrc, audio_queue_);
send_rtcp_xr_rrtr(video_ssrc);
send_rtcp_xr_rrtr(audio_ssrc);
// TODO: FIXME: Check error.
// We should not depends on the received packet,
// instead we should send feedback every Nms.
send_periodic_twcc();
return err;
}
@ -1984,7 +1771,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
player_ = NULL;
sendonly_skt = NULL;
server_ = s;
dtls_ = new SrsRtcDtls(this);
transport_ = new SrsSecurityTransport(this);
state_ = INIT;
last_stun_time = 0;
@ -2000,7 +1787,7 @@ SrsRtcSession::~SrsRtcSession()
{
srs_freep(player_);
srs_freep(publisher_);
srs_freep(dtls_);
srs_freep(transport_);
srs_freep(req);
srs_close_stfd(blackhole_stfd);
srs_freep(blackhole_addr);
@ -2083,7 +1870,8 @@ srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool
is_publisher_ = is_publisher;
source_ = source;
if ((err = dtls_->initialize(req)) != srs_success) {
SrsSessionConfig* cfg = &local_sdp.session_config_;
if ((err = transport_->initialize(cfg)) != srs_success) {
return srs_error_wrap(err, "init");
}
@ -2093,7 +1881,8 @@ srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool
blackhole = _srs_config->get_rtc_server_black_hole();
srs_trace("RTC init session, timeout=%dms, blackhole=%d", srsu2msi(sessionStunTimeout), blackhole);
srs_trace("RTC init session, DTLS(role=%s, version=%s), timeout=%dms, blackhole=%d",
cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), srsu2msi(sessionStunTimeout), blackhole);
if (blackhole) {
string blackhole_ep = _srs_config->get_rtc_server_black_hole_addr();
@ -2150,20 +1939,20 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
srs_error_t SrsRtcSession::on_dtls(char* data, int nb_data)
{
return dtls_->on_dtls(data, nb_data);
return transport_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
if (dtls_ == NULL) {
if (transport_ == NULL) {
return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done");
}
char unprotected_buf[kRtpPacketSize];
int nb_unprotected_buf = nb_data;
if ((err = dtls_->unprotect_rtcp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) {
if ((err = transport_->unprotect_rtcp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect failed");
}
@ -2190,7 +1979,7 @@ srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data)
return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null");
}
if (dtls_ == NULL) {
if (transport_ == NULL) {
return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done");
}
@ -2221,7 +2010,11 @@ srs_error_t SrsRtcSession::start_play()
{
srs_error_t err = srs_success;
srs_freep(player_);
// If player is initialized, we think the session is started.
// To prevent play multiple times for the DTLS ARQ packet.
if (player_) {
return err;
}
player_ = new SrsRtcPlayer(this, _srs_context->get_id());
uint32_t video_ssrc = 0;
@ -2254,8 +2047,13 @@ srs_error_t SrsRtcSession::start_publish()
{
srs_error_t err = srs_success;
srs_freep(publisher_);
// If publisher is initialized, we think the session is started.
// To prevent publish multiple times for the DTLS ARQ packet.
if (publisher_) {
return err;
}
publisher_ = new SrsRtcPublisher(this);
// Request PLI for exists players?
//publisher_->request_keyframe();
@ -2371,6 +2169,10 @@ srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r)
state_ = DOING_DTLS_HANDSHAKE;
srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str());
if((err = transport_->start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
if (blackhole && blackhole_addr && blackhole_stfd) {

View file

@ -37,15 +37,13 @@
#include <srs_kernel_rtc_rtcp.hpp>
#include <srs_app_rtc_queue.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <string>
#include <map>
#include <vector>
#include <sys/socket.h>
#include <openssl/ssl.h>
#include <srtp2/srtp.h>
class SrsUdpMuxSocket;
class SrsConsumer;
class SrsStunPacket;
@ -108,44 +106,42 @@ enum SrsRtcSessionStateType
CLOSED = 5,
};
class SrsRtcDtls
class SrsSecurityTransport : public ISrsDtlsCallback
{
private:
SrsRtcSession* session_;
SSL* dtls;
BIO* bio_in;
BIO* bio_out;
std::string client_key;
std::string server_key;
srtp_t srtp_send;
srtp_t srtp_recv;
SrsDtls* dtls_;
SrsSRTP* srtp_;
bool handshake_done;
public:
SrsRtcDtls(SrsRtcSession* s);
virtual ~SrsRtcDtls();
srs_error_t initialize(SrsRequest* r);
SrsSecurityTransport(SrsRtcSession* s);
virtual ~SrsSecurityTransport();
srs_error_t initialize(SrsSessionConfig* cfg);
// When play role of dtls client, it send handshake.
srs_error_t start_active_handshake();
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_dtls_handshake_done();
srs_error_t on_dtls_application_data(const char* data, const int len);
public:
srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
// Encrypt the input plaintext to output cipher with nb_cipher bytes.
// @remark Note that the nb_cipher is the size of input plaintext, and
// it also is the length of output cipher when return.
srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher);
srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher);
// Encrypt the input rtp_hdr with *len_ptr bytes.
// @remark the input plaintext and out cipher reuse rtp_hdr.
srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
private:
srs_error_t handshake();
// Decrypt the input cipher to output cipher with nb_cipher bytes.
// @remark Note that the nb_plaintext is the size of input cipher, and
// it also is the length of output plaintext when return.
srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext);
srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext);
// implement ISrsDtlsCallback
public:
virtual srs_error_t on_dtls_handshake_done();
virtual srs_error_t on_dtls_application_data(const char* data, const int len);
virtual srs_error_t write_dtls_data(void* data, int size);
private:
srs_error_t srtp_initialize();
srs_error_t srtp_send_init();
srs_error_t srtp_recv_init();
};
// A group of RTP packets for outgoing(send to players).
@ -195,18 +191,15 @@ protected:
private:
// TODO: FIXME: How to handle timestamp overflow?
// Information for audio.
uint16_t audio_sequence;
uint32_t audio_ssrc;
uint16_t audio_payload_type;
// Information for video.
uint16_t video_sequence;
uint16_t video_payload_type;
uint32_t video_ssrc;
// NACK ARQ ring buffer.
SrsRtpRingBuffer* audio_queue_;
SrsRtpRingBuffer* video_queue_;
// Simulators.
uint16_t sequence_delta;
int nn_simulate_nack_drop;
private:
// For merged-write messages.
@ -214,13 +207,11 @@ private:
bool realtime;
// Whether enabled nack.
bool nack_enabled_;
// Whether keep original sequence number.
bool keep_sequence_;
public:
SrsRtcPlayer(SrsRtcSession* s, std::string parent_cid);
virtual ~SrsRtcPlayer();
public:
srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt);
srs_error_t initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt);
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
@ -260,6 +251,9 @@ private:
SrsRtcSession* session_;
uint32_t video_ssrc;
uint32_t audio_ssrc;
uint16_t pt_to_drop_;
// Whether enabled nack.
bool nack_enabled_;
private:
bool request_keyframe_;
SrsRtpRingBuffer* video_queue_;
@ -269,8 +263,6 @@ private:
private:
SrsRequest* req;
SrsRtcSource* source;
// Whether enabled nack.
bool nack_enabled_;
// Simulators.
int nn_simulate_nack_drop;
private:
@ -278,15 +270,15 @@ private:
std::map<uint32_t, SrsNtp> last_sender_report_ntp;
private:
srs_utime_t last_twcc_feedback_time_;
uint8_t twcc_ext_id_;
int twcc_id_;
uint8_t twcc_fb_count_;
SrsRtcpTWCC rtcp_twcc_;
SrsRtpHeaderExtensionMap extension_map_;
SrsRtpExtensionTypes extension_types_;
public:
SrsRtcPublisher(SrsRtcSession* session);
virtual ~SrsRtcPublisher();
public:
srs_error_t initialize(uint32_t vssrc, uint32_t assrc, uint8_t twcc_ext_id, SrsRequest* req);
srs_error_t initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* req);
private:
void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc);
srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue);
@ -299,6 +291,7 @@ private:
srs_error_t on_audio(SrsRtpPacket2* pkt);
srs_error_t on_video(SrsRtpPacket2* pkt);
srs_error_t on_nack(SrsRtpPacket2* pkt);
srs_error_t send_periodic_twcc();
public:
srs_error_t on_rtcp(char* data, int nb_data);
private:
@ -322,7 +315,7 @@ private:
class SrsRtcSession
{
friend class SrsRtcDtls;
friend class SrsSecurityTransport;
friend class SrsRtcPlayer;
friend class SrsRtcPublisher;
public:
@ -330,7 +323,7 @@ public:
private:
SrsRtcServer* server_;
SrsRtcSessionStateType state_;
SrsRtcDtls* dtls_;
SrsSecurityTransport* transport_;
SrsRtcPlayer* player_;
SrsRtcPublisher* publisher_;
bool is_publisher_;
@ -381,6 +374,7 @@ public:
void switch_to_context();
std::string context_id();
public:
// Before initialize, user must set the local SDP, which is used to inititlize DTLS.
srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, std::string context_id);
// The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r);

View file

@ -30,22 +30,12 @@ using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_config.hpp>
#include <srs_core_autofree.hpp>
#include <srs_rtmp_stack.hpp>
#include <srtp2/srtp.h>
#include <openssl/ssl.h>
SrsDtls* SrsDtls::_instance = NULL;
SrsDtls::SrsDtls()
{
dtls_ctx = NULL;
}
SrsDtls::~SrsDtls()
{
SSL_CTX_free(dtls_ctx);
}
// The return value of verify_callback controls the strategy of the further verification process. If verify_callback
// returns 0, the verification process is immediately stopped with "verification failed" state. If SSL_VERIFY_PEER is
// set, a verification failure alert is sent to the peer and the TLS/SSL handshake is terminated. If verify_callback
@ -61,12 +51,34 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
return 1;
}
srs_error_t SrsDtls::init(SrsRequest* r)
SrsDtlsCertificate::SrsDtlsCertificate()
{
dtls_cert = NULL;
dtls_pkey = NULL;
eckey = NULL;
}
SrsDtlsCertificate::~SrsDtlsCertificate()
{
if (eckey) {
EC_KEY_free(eckey);
}
if (dtls_pkey) {
EVP_PKEY_free(dtls_pkey);
}
if (dtls_cert) {
X509_free(dtls_cert);
}
}
srs_error_t SrsDtlsCertificate::initialize()
{
srs_error_t err = srs_success;
// Initialize once.
if (dtls_ctx) {
if (dtls_cert) {
return err;
}
@ -78,24 +90,16 @@ srs_error_t SrsDtls::init(SrsRequest* r)
OpenSSL_add_ssl_algorithms();
#endif
#if OPENSSL_VERSION_NUMBER < 0x10002000L // v1.0.2
dtls_ctx = SSL_CTX_new(DTLSv1_method());
#else
dtls_ctx = SSL_CTX_new(DTLS_method());
//dtls_ctx = SSL_CTX_new(DTLSv1_method());
//dtls_ctx = SSL_CTX_new(DTLSv1_2_method());
#endif
// Initialize SRTP first.
srs_assert(srtp_init() == 0);
// Whether use ECDSA certificate.
bool is_ecdsa = _srs_config->get_rtc_server_ecdsa();
ecdsa_mode = _srs_config->get_rtc_server_ecdsa();
// Create keys by RSA or ECDSA.
EVP_PKEY* dtls_pkey = EVP_PKEY_new();
dtls_pkey = EVP_PKEY_new();
srs_assert(dtls_pkey);
if (!is_ecdsa) { // By RSA
if (!ecdsa_mode) { // By RSA
RSA* rsa = RSA_new();
srs_assert(rsa);
@ -115,15 +119,10 @@ srs_error_t SrsDtls::init(SrsRequest* r)
RSA_free(rsa);
BN_free(exponent);
}
if (is_ecdsa) { // By ECDSA, https://stackoverflow.com/a/6006898
EC_KEY* eckey = EC_KEY_new();
if (ecdsa_mode) { // By ECDSA, https://stackoverflow.com/a/6006898
eckey = EC_KEY_new();
srs_assert(eckey);
#if OPENSSL_VERSION_NUMBER >= 0x10002000L // v1.0.2
// For ECDSA, we could set the curves list.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set1_curves_list.html
SSL_CTX_set1_curves_list(dtls_ctx, "P-521:P-384:P-256");
#endif
// Should use the curves in ClientHello.supported_groups
// For example:
// Supported Group: x25519 (0x001d)
@ -145,25 +144,15 @@ srs_error_t SrsDtls::init(SrsRequest* r)
srs_assert(EC_KEY_set_group(eckey, ecgroup) == 1);
srs_assert(EC_KEY_generate_key(eckey) == 1);
// For openssl <1.1, we must set the ECDH manually.
// @see https://stackoverrun.com/cn/q/10791887
#if OPENSSL_VERSION_NUMBER < 0x10100000L // v1.1.x
#if OPENSSL_VERSION_NUMBER < 0x10002000L // v1.0.2
SSL_CTX_set_tmp_ecdh(dtls_ctx, eckey);
#else
SSL_CTX_set_ecdh_auto(dtls_ctx, 1);
#endif
#endif
// @see https://www.openssl.org/docs/man1.1.0/man3/EVP_PKEY_type.html
srs_assert(EVP_PKEY_set1_EC_KEY(dtls_pkey, eckey) == 1);
EC_GROUP_free(ecgroup);
EC_KEY_free(eckey);
}
// Create certificate, from previous generated pkey.
// TODO: Support ECDSA certificate.
X509* dtls_cert = X509_new();
dtls_cert = X509_new();
srs_assert(dtls_cert);
if (true) {
X509_NAME* subject = X509_NAME_new();
@ -191,36 +180,6 @@ srs_error_t SrsDtls::init(SrsRequest* r)
X509_NAME_free(subject);
}
// Setup DTLS context.
if (true) {
// We use "ALL", while you can use "DEFAULT" means "ALL:!EXPORT:!LOW:!aNULL:!eNULL:!SSLv2"
// @see https://www.openssl.org/docs/man1.0.2/man1/ciphers.html
srs_assert(SSL_CTX_set_cipher_list(dtls_ctx, "ALL") == 1);
// Setup the certificate.
srs_assert(SSL_CTX_use_certificate(dtls_ctx, dtls_cert) == 1);
srs_assert(SSL_CTX_use_PrivateKey(dtls_ctx, dtls_pkey) == 1);
// Server will send Certificate Request.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
// TODO: FIXME: Config it, default to off to make the packet smaller.
SSL_CTX_set_verify(dtls_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, verify_callback);
// The depth count is "level 0:peer certificate", "level 1: CA certificate",
// "level 2: higher level CA certificate", and so on.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
SSL_CTX_set_verify_depth(dtls_ctx, 4);
// Whether we should read as many input bytes as possible (for non-blocking reads) or not.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_read_ahead.html
SSL_CTX_set_read_ahead(dtls_ctx, 1);
// TODO: Maybe we can use SRTP-GCM in future.
// @see https://bugs.chromium.org/p/chromium/issues/detail?id=713701
// @see https://groups.google.com/forum/#!topic/discuss-webrtc/PvCbWSetVAQ
// @remark Only support SRTP_AES128_CM_SHA1_80, please read ssl/d1_srtp.c
srs_assert(SSL_CTX_set_tlsext_use_srtp(dtls_ctx, "SRTP_AES128_CM_SHA1_80") == 0);
}
// Show DTLS fingerprint
if (true) {
char fp[100] = {0};
@ -249,21 +208,412 @@ srs_error_t SrsDtls::init(SrsRequest* r)
return err;
}
SrsDtls* SrsDtls::instance()
X509* SrsDtlsCertificate::get_cert()
{
if (!_instance) {
_instance = new SrsDtls();
}
return _instance;
return dtls_cert;
}
SSL_CTX* SrsDtls::get_dtls_ctx()
EVP_PKEY* SrsDtlsCertificate::get_public_key()
{
return dtls_ctx;
return dtls_pkey;
}
std::string SrsDtls::get_fingerprint() const
EC_KEY* SrsDtlsCertificate::get_ecdsa_key()
{
return eckey;
}
std::string SrsDtlsCertificate::get_fingerprint()
{
return fingerprint;
}
bool SrsDtlsCertificate::is_ecdsa()
{
return ecdsa_mode;
}
ISrsDtlsCallback::ISrsDtlsCallback()
{
}
ISrsDtlsCallback::~ISrsDtlsCallback()
{
}
SrsDtls::SrsDtls(ISrsDtlsCallback* cb)
{
callback = cb;
handshake_done = false;
role_ = SrsDtlsRoleServer;
version_ = SrsDtlsVersionAuto;
}
SrsDtls::~SrsDtls()
{
if (dtls_ctx) {
SSL_CTX_free(dtls_ctx);
dtls_ctx = NULL;
}
if (dtls) {
// this function will free bio_in and bio_out
SSL_free(dtls);
dtls = NULL;
}
}
SSL_CTX* SrsDtls::build_dtls_ctx()
{
SSL_CTX* dtls_ctx;
#if OPENSSL_VERSION_NUMBER < 0x10002000L // v1.0.2
dtls_ctx = SSL_CTX_new(DTLSv1_method());
#else
if (version_ == SrsDtlsVersion1_0) {
dtls_ctx = SSL_CTX_new(DTLSv1_method());
} else if (version_ == SrsDtlsVersion1_2) {
dtls_ctx = SSL_CTX_new(DTLSv1_2_method());
} else {
// SrsDtlsVersionAuto, use version-flexible DTLS methods
dtls_ctx = SSL_CTX_new(DTLS_method());
}
#endif
if (_srs_rtc_dtls_certificate->is_ecdsa()) { // By ECDSA, https://stackoverflow.com/a/6006898
#if OPENSSL_VERSION_NUMBER >= 0x10002000L // v1.0.2
// For ECDSA, we could set the curves list.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set1_curves_list.html
SSL_CTX_set1_curves_list(dtls_ctx, "P-521:P-384:P-256");
#endif
// For openssl <1.1, we must set the ECDH manually.
// @see https://stackoverrun.com/cn/q/10791887
#if OPENSSL_VERSION_NUMBER < 0x10100000L // v1.1.x
#if OPENSSL_VERSION_NUMBER < 0x10002000L // v1.0.2
SSL_CTX_set_tmp_ecdh(dtls_ctx, _srs_rtc_dtls_certificate->get_ecdsa_key());
#else
SSL_CTX_set_ecdh_auto(dtls_ctx, 1);
#endif
#endif
}
// Setup DTLS context.
if (true) {
// We use "ALL", while you can use "DEFAULT" means "ALL:!EXPORT:!LOW:!aNULL:!eNULL:!SSLv2"
// @see https://www.openssl.org/docs/man1.0.2/man1/ciphers.html
srs_assert(SSL_CTX_set_cipher_list(dtls_ctx, "ALL") == 1);
// Setup the certificate.
srs_assert(SSL_CTX_use_certificate(dtls_ctx, _srs_rtc_dtls_certificate->get_cert()) == 1);
srs_assert(SSL_CTX_use_PrivateKey(dtls_ctx, _srs_rtc_dtls_certificate->get_public_key()) == 1);
// Server will send Certificate Request.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
// TODO: FIXME: Config it, default to off to make the packet smaller.
SSL_CTX_set_verify(dtls_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, verify_callback);
// The depth count is "level 0:peer certificate", "level 1: CA certificate",
// "level 2: higher level CA certificate", and so on.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
SSL_CTX_set_verify_depth(dtls_ctx, 4);
// Whether we should read as many input bytes as possible (for non-blocking reads) or not.
// @see https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_read_ahead.html
SSL_CTX_set_read_ahead(dtls_ctx, 1);
// TODO: Maybe we can use SRTP-GCM in future.
// @see https://bugs.chromium.org/p/chromium/issues/detail?id=713701
// @see https://groups.google.com/forum/#!topic/discuss-webrtc/PvCbWSetVAQ
// @remark Only support SRTP_AES128_CM_SHA1_80, please read ssl/d1_srtp.c
srs_assert(SSL_CTX_set_tlsext_use_srtp(dtls_ctx, "SRTP_AES128_CM_SHA1_80") == 0);
}
return dtls_ctx;
}
srs_error_t SrsDtls::initialize(std::string role, std::string version)
{
srs_error_t err = srs_success;
role_ = SrsDtlsRoleServer;
if (role == "active") {
role_ = SrsDtlsRoleClient;
}
if (version == "dtls1.0") {
version_ = SrsDtlsVersion1_0;
} else if (version == "dtls1.2") {
version_ = SrsDtlsVersion1_2;
} else {
version_ = SrsDtlsVersionAuto;
}
dtls_ctx = build_dtls_ctx();
// TODO: FIXME: Leak for SSL_CTX* return by build_dtls_ctx.
if ((dtls = SSL_new(dtls_ctx)) == NULL) {
return srs_error_new(ERROR_OpenSslCreateSSL, "SSL_new dtls");
}
if (role == "active") {
// Dtls setup active, as client role.
SSL_set_connect_state(dtls);
SSL_set_max_send_fragment(dtls, 1500);
} else {
// Dtls setup passive, as server role.
SSL_set_accept_state(dtls);
}
if ((bio_in = BIO_new(BIO_s_mem())) == NULL) {
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new in");
}
if ((bio_out = BIO_new(BIO_s_mem())) == NULL) {
BIO_free(bio_in);
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new out");
}
SSL_set_bio(dtls, bio_in, bio_out);
return err;
}
srs_error_t SrsDtls::do_handshake()
{
srs_error_t err = srs_success;
int ret = SSL_do_handshake(dtls);
unsigned char *out_bio_data;
int out_bio_len = BIO_get_mem_data(bio_out, &out_bio_data);
int ssl_err = SSL_get_error(dtls, ret);
switch(ssl_err) {
case SSL_ERROR_NONE: {
if ((callback == NULL) || ((err = callback->on_dtls_handshake_done()) != srs_success)) {
return srs_error_wrap(err, "dtls handshake done handle");
}
break;
}
case SSL_ERROR_WANT_READ: {
break;
}
case SSL_ERROR_WANT_WRITE: {
break;
}
default: {
break;
}
}
if (out_bio_len && callback) {
if ((err = callback->write_dtls_data(out_bio_data, out_bio_len)) != srs_success) {
return srs_error_wrap(err, "send dtls packet");
}
}
return err;
}
srs_error_t SrsDtls::on_dtls(char* data, int nb_data)
{
srs_error_t err = srs_success;
if (BIO_reset(bio_in) != 1) {
return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset");
}
if (BIO_reset(bio_out) != 1) {
return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset");
}
if (BIO_write(bio_in, data, nb_data) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write");
}
if (!handshake_done) {
err = do_handshake();
} else {
while (BIO_ctrl_pending(bio_in) > 0) {
char dtls_read_buf[8092];
int nb = SSL_read(dtls, dtls_read_buf, sizeof(dtls_read_buf));
if (nb > 0 && callback) {
if ((err = callback->on_dtls_application_data(dtls_read_buf, nb)) != srs_success) {
return srs_error_wrap(err, "dtls application data process");
}
}
}
}
return err;
}
srs_error_t SrsDtls::start_active_handshake()
{
if (role_ == SrsDtlsRoleClient) {
return do_handshake();
}
return srs_success;
}
const int SRTP_MASTER_KEY_KEY_LEN = 16;
const int SRTP_MASTER_KEY_SALT_LEN = 14;
srs_error_t SrsDtls::get_srtp_key(std::string& recv_key, std::string& send_key)
{
srs_error_t err = srs_success;
unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server
static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL_export_keying_material failed");
}
size_t offset = 0;
std::string client_master_key(reinterpret_cast<char*>(material), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string server_master_key(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string client_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
offset += SRTP_MASTER_KEY_SALT_LEN;
std::string server_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
if (role_ == SrsDtlsRoleClient) {
recv_key = server_master_key + server_master_salt;
send_key = client_master_key + client_master_salt;
} else {
recv_key = client_master_key + client_master_salt;
send_key = server_master_key + server_master_salt;
}
return err;
}
SrsSRTP::SrsSRTP()
{
recv_ctx_ = NULL;
send_ctx_ = NULL;
}
SrsSRTP::~SrsSRTP()
{
if (recv_ctx_) {
srtp_dealloc(recv_ctx_);
}
if (send_ctx_) {
srtp_dealloc(send_ctx_);
}
}
srs_error_t SrsSRTP::initialize(string recv_key, std::string send_key)
{
srs_error_t err = srs_success;
srtp_policy_t policy;
bzero(&policy, sizeof(policy));
// TODO: Maybe we can use SRTP-GCM in future.
// @see https://bugs.chromium.org/p/chromium/issues/detail?id=713701
// @see https://groups.google.com/forum/#!topic/discuss-webrtc/PvCbWSetVAQ
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp);
policy.ssrc.value = 0;
// TODO: adjust window_size
policy.window_size = 8192;
policy.allow_repeat_tx = 1;
policy.next = NULL;
// init recv context
policy.ssrc.type = ssrc_any_inbound;
uint8_t *rkey = new uint8_t[recv_key.size()];
SrsAutoFreeA(uint8_t, rkey);
memcpy(rkey, recv_key.data(), recv_key.size());
policy.key = rkey;
if (srtp_create(&recv_ctx_, &policy) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "srtp_create recv failed");
}
policy.ssrc.type = ssrc_any_outbound;
uint8_t *skey = new uint8_t[send_key.size()];
SrsAutoFreeA(uint8_t, skey);
memcpy(skey, send_key.data(), send_key.size());
policy.key = skey;
if (srtp_create(&send_ctx_, &policy) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "srtp_create recv failed");
}
return err;
}
srs_error_t SrsSRTP::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher)
{
srs_error_t err = srs_success;
memcpy(cipher, plaintext, nb_cipher);
// TODO: FIXME: Wrap error code.
if (srtp_protect(send_ctx_, cipher, &nb_cipher) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed");
}
return err;
}
srs_error_t SrsSRTP::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher)
{
srs_error_t err = srs_success;
memcpy(cipher, plaintext, nb_cipher);
// TODO: FIXME: Wrap error code.
if (srtp_protect_rtcp(send_ctx_, cipher, &nb_cipher) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtcp protect failed");
}
return err;
}
srs_error_t SrsSRTP::protect_rtp2(void* rtp_hdr, int* len_ptr)
{
srs_error_t err = srs_success;
// TODO: FIXME: Wrap error code.
if (srtp_protect(send_ctx_, rtp_hdr, len_ptr) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
}
return err;
}
srs_error_t SrsSRTP::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext)
{
srs_error_t err = srs_success;
memcpy(plaintext, cipher, nb_plaintext);
srtp_err_status_t r0 = srtp_unprotect(recv_ctx_, plaintext, &nb_plaintext);
if (r0 != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "unprotect r0=%u", r0);
}
return err;
}
srs_error_t SrsSRTP::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext)
{
srs_error_t err = srs_success;
memcpy(plaintext, cipher, nb_plaintext);
// TODO: FIXME: Wrap error code.
if (srtp_unprotect_rtcp(recv_ctx_, plaintext, &nb_plaintext) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed");
}
return err;
}

View file

@ -31,24 +31,123 @@
class SrsRequest;
#include <openssl/ssl.h>
#include <srtp2/srtp.h>
class SrsDtlsCertificate
{
private:
std::string fingerprint;
bool ecdsa_mode;
X509* dtls_cert;
EVP_PKEY* dtls_pkey;
EC_KEY* eckey;
public:
SrsDtlsCertificate();
virtual ~SrsDtlsCertificate();
public:
// Initialize DTLS certificate.
srs_error_t initialize();
// dtls_cert
X509* get_cert();
// public key
EVP_PKEY* get_public_key();
// ECDSA key
EC_KEY* get_ecdsa_key();
// certificate fingerprint
std::string get_fingerprint();
// whether is ecdsa
bool is_ecdsa();
};
// @global config object.
extern SrsDtlsCertificate* _srs_rtc_dtls_certificate;
// @remark: play the role of DTLS_CLIENT, will send handshake
// packet first.
enum SrsDtlsRole {
SrsDtlsRoleClient,
SrsDtlsRoleServer
};
// @remark: DTLS_10 will all be ignored, and only DTLS1_2 will be accepted,
// DTLS_10 Support will be completely removed in M84 or later.
// TODO(https://bugs.webrtc.org/10261).
enum SrsDtlsVersion {
SrsDtlsVersionAuto = -1,
SrsDtlsVersion1_0,
SrsDtlsVersion1_2
};
class ISrsDtlsCallback
{
public:
ISrsDtlsCallback();
virtual ~ISrsDtlsCallback();
public:
// DTLS handshake done callback.
virtual srs_error_t on_dtls_handshake_done() = 0;
// DTLS receive application data callback.
virtual srs_error_t on_dtls_application_data(const char* data, const int len) = 0;
// DTLS write dtls data.
virtual srs_error_t write_dtls_data(void* data, int size) = 0;
};
class SrsDtls
{
private:
static SrsDtls* _instance;
private:
std::string fingerprint;
SSL_CTX* dtls_ctx;
private:
SrsDtls();
SSL* dtls;
BIO* bio_in;
BIO* bio_out;
ISrsDtlsCallback* callback;
bool handshake_done;
// @remark: dtls_role_ default value is DTLS_SERVER.
SrsDtlsRole role_;
// @remark: dtls_version_ default value is SrsDtlsVersionAuto.
SrsDtlsVersion version_;
public:
SrsDtls(ISrsDtlsCallback* callback);
virtual ~SrsDtls();
public:
srs_error_t init(SrsRequest* r);
srs_error_t initialize(std::string role, std::string version);
// As DTLS client, start handshake actively, send the ClientHello packet.
srs_error_t start_active_handshake();
// When got DTLS packet, may handshake packets or application data.
// @remark When we are passive(DTLS server), we start handshake when got DTLS packet.
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t get_srtp_key(std::string& recv_key, std::string& send_key);
private:
SSL_CTX* build_dtls_ctx();
srs_error_t do_handshake();
};
class SrsSRTP
{
private:
srtp_t recv_ctx_;
srtp_t send_ctx_;
public:
static SrsDtls* instance();
SSL_CTX* get_dtls_ctx();
SrsSRTP();
virtual ~SrsSRTP();
public:
std::string get_fingerprint() const;
// Intialize srtp context with recv_key and send_key.
srs_error_t initialize(std::string recv_key, std::string send_key);
public:
// Encrypt the input plaintext to output cipher with nb_cipher bytes.
// @remark Note that the nb_cipher is the size of input plaintext, and
// it also is the length of output cipher when return.
srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher);
srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher);
// Encrypt the input rtp_hdr with *len_ptr bytes.
// @remark the input plaintext and out cipher reuse rtp_hdr.
srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
// Decrypt the input cipher to output cipher with nb_cipher bytes.
// @remark Note that the nb_plaintext is the size of input cipher, and
// it also is the length of output plaintext when return.
srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext);
srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext);
};
#endif

View file

@ -206,6 +206,40 @@ srs_error_t SrsSSRCInfo::encode(std::ostringstream& os)
return err;
}
SrsSSRCGroup::SrsSSRCGroup()
{
}
SrsSSRCGroup::~SrsSSRCGroup()
{
}
SrsSSRCGroup::SrsSSRCGroup(const std::string& semantic, const std::vector<uint32_t>& ssrcs)
{
semantic_ = semantic;
ssrcs_ = ssrcs;
}
srs_error_t SrsSSRCGroup::encode(std::ostringstream& os)
{
srs_error_t err = srs_success;
if (semantic_.empty()) {
return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid semantics");
}
if (ssrcs_.size() == 0) {
return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid ssrcs");
}
os << "a=ssrc-group:" << semantic_;
for (int i = 0; i < (int)ssrcs_.size(); i++) {
os << " " << ssrcs_[i];
}
return err;
}
SrsMediaPayloadType::SrsMediaPayloadType(int payload_type)
{
payload_type_ = payload_type;
@ -589,6 +623,7 @@ srs_error_t SrsMediaDesc::parse_attr_ssrc(const std::string& value)
ssrc_info.cname_ = ssrc_value;
ssrc_info.ssrc_ = ssrc;
} else if (ssrc_attr == "msid") {
// @see: https://tools.ietf.org/html/draft-alvestrand-mmusic-msid-00#section-2
std::vector<std::string> vec = split_str(ssrc_value, " ");
if (vec.empty()) {
return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid ssrc line=%s", value.c_str());
@ -618,10 +653,23 @@ srs_error_t SrsMediaDesc::parse_attr_ssrc_group(const std::string& value)
std::string semantics;
FETCH(is, semantics);
// TODO: ssrc group process
if (semantics == "FID") {
std::string ssrc_ids = is.str().substr(is.tellg());
skip_first_spaces(ssrc_ids);
std::vector<std::string> vec = split_str(ssrc_ids, " ");
if (vec.size() == 0) {
return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid ssrc-group line=%s", value.c_str());
}
std::vector<uint32_t> ssrcs;
for (size_t i = 0; i < vec.size(); ++i) {
std::istringstream in_stream(vec[i]);
uint32_t ssrc = 0;
in_stream >> ssrc;
ssrcs.push_back(ssrc);
}
ssrc_groups_.push_back(SrsSSRCGroup(semantics, ssrcs));
return err;
}

View file

@ -34,6 +34,13 @@
#include <map>
const std::string kTWCCExt = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
struct SrsSessionConfig
{
public:
std::string dtls_role;
std::string dtls_version;
};
class SrsSessionInfo
{
public:
@ -71,6 +78,17 @@ public:
class SrsSSRCGroup
{
public:
SrsSSRCGroup();
SrsSSRCGroup(const std::string& usage, const std::vector<uint32_t>& ssrcs);
virtual ~SrsSSRCGroup();
public:
srs_error_t encode(std::ostringstream& os);
public:
// e.g FIX, FEC, SIM.
std::string semantic_;
// SSRCs of this type.
std::vector<uint32_t> ssrcs_;
};
struct H264SpecificParam
@ -212,6 +230,7 @@ public:
int64_t end_time_;
SrsSessionInfo session_info_;
SrsSessionConfig session_config_;
std::vector<std::string> groups_;
std::string group_policy_;

View file

@ -40,6 +40,9 @@
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtc_api.hpp>
// @global dtls certficate for rtc module.
SrsDtlsCertificate* _srs_rtc_dtls_certificate = new SrsDtlsCertificate();
using namespace std;
static bool is_stun(const uint8_t* data, const int size)
@ -320,20 +323,10 @@ srs_error_t SrsRtcServer::create_session(
}
}
std::string cid = _srs_context->get_id();
SrsRtcSession* session = new SrsRtcSession(this);
if ((err = session->initialize(source, req, publish, username, cid)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "init");
}
map_username_session.insert(make_pair(username, session));
*psession = session;
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
local_sdp.set_fingerprint_algo("sha-256");
local_sdp.set_fingerprint(SrsDtls::instance()->get_fingerprint());
local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());
// We allows to mock the eip of server.
if (!mock_eip.empty()) {
@ -345,10 +338,22 @@ srs_error_t SrsRtcServer::create_session(
}
}
SrsRtcSession* session = new SrsRtcSession(this);
session->set_remote_sdp(remote_sdp);
// We must setup the local SDP, then initialize the session object.
session->set_local_sdp(local_sdp);
session->set_state(WAITING_STUN);
std::string cid = _srs_context->get_id();
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(source, req, publish, username, cid)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "init");
}
map_username_session.insert(make_pair(username, session));
*psession = session;
return err;
}
@ -366,7 +371,7 @@ srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcSession** pse
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
local_sdp.set_fingerprint_algo("sha-256");
local_sdp.set_fingerprint(SrsDtls::instance()->get_fingerprint());
local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());
// We allows to mock the eip of server.
std::vector<string> candidate_ips = get_candidate_ips();
@ -527,6 +532,10 @@ srs_error_t RtcServerAdapter::initialize()
{
srs_error_t err = srs_success;
if ((err = _srs_rtc_dtls_certificate->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc dtls certificate initialize");
}
if ((err = rtc->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc server initialize");
}

View file

@ -127,7 +127,7 @@ public:
bool require(int required_size);
public:
// Skip some size.
// @param size can be any value. positive to forward; nagetive to backward.
// @param size can be any value. positive to forward; negative to backward.
// @remark to skip(pos()) to reset buffer.
// @remark assert initialized, the data() not NULL.
void skip(int size);

View file

@ -23,8 +23,6 @@
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
ISrsLog::ISrsLog()
{
}
@ -33,56 +31,12 @@ ISrsLog::~ISrsLog()
{
}
srs_error_t ISrsLog::initialize()
{
return srs_success;
}
void ISrsLog::reopen()
ISrsContext::ISrsContext()
{
}
void ISrsLog::verbose(const char* /*tag*/, const char* /*context_id*/, const char* /*fmt*/, ...)
ISrsContext::~ISrsContext()
{
}
void ISrsLog::info(const char* /*tag*/, const char* /*context_id*/, const char* /*fmt*/, ...)
{
}
void ISrsLog::trace(const char* /*tag*/, const char* /*context_id*/, const char* /*fmt*/, ...)
{
}
void ISrsLog::warn(const char* /*tag*/, const char* /*context_id*/, const char* /*fmt*/, ...)
{
}
void ISrsLog::error(const char* /*tag*/, const char* /*context_id*/, const char* /*fmt*/, ...)
{
}
ISrsThreadContext::ISrsThreadContext()
{
}
ISrsThreadContext::~ISrsThreadContext()
{
}
std::string ISrsThreadContext::generate_id()
{
return "";
}
std::string ISrsThreadContext::get_id()
{
return "";
}
std::string ISrsThreadContext::set_id(std::string /*v*/)
{
return "";
}

View file

@ -60,49 +60,49 @@ public:
virtual ~ISrsLog();
public:
// Initialize log utilities.
virtual srs_error_t initialize();
virtual srs_error_t initialize() = 0;
// Reopen the log file for log rotate.
virtual void reopen();
virtual void reopen() = 0;
public:
// The log for verbose, very verbose information.
virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...);
virtual void verbose(const char* tag, const char* context_id, const char* fmt, ...) = 0;
// The log for debug, detail information.
virtual void info(const char* tag, const char* context_id, const char* fmt, ...);
virtual void info(const char* tag, const char* context_id, const char* fmt, ...) = 0;
// The log for trace, important information.
virtual void trace(const char* tag, const char* context_id, const char* fmt, ...);
virtual void trace(const char* tag, const char* context_id, const char* fmt, ...) = 0;
// The log for warn, warn is something should take attention, but not a error.
virtual void warn(const char* tag, const char* context_id, const char* fmt, ...);
virtual void warn(const char* tag, const char* context_id, const char* fmt, ...) = 0;
// The log for error, something error occur, do something about the error, ie. close the connection,
// but we will donot abort the program.
virtual void error(const char* tag, const char* context_id, const char* fmt, ...);
virtual void error(const char* tag, const char* context_id, const char* fmt, ...) = 0;
};
// The context id manager to identify context, for instance, the green-thread.
// Usage:
// _srs_context->generate_id(); // when thread start.
// _srs_context->get_id(); // get current generated id.
// int old_id = _srs_context->set_id(1000); // set context id if need to merge thread context.
// The context for multiple clients.
class ISrsThreadContext
// The logic context for a RTMP connection, or RTC Session.
// We can grep the context id to identify the logic unit, for debugging.
// For example:
// _srs_context->generate_id(); // Generate a new context.
// _srs_context->get_id(); // Get current context.
// int old_id = _srs_context->set_id("1000"); // Change the context.
class ISrsContext
{
public:
ISrsThreadContext();
virtual ~ISrsThreadContext();
ISrsContext();
virtual ~ISrsContext();
public:
// Generate the id for current context.
virtual std::string generate_id();
virtual std::string generate_id() = 0;
// Get the generated id of current context.
virtual std::string get_id();
virtual std::string get_id() = 0;
// Set the id of current context.
// @return the previous id value; 0 if no context.
virtual std::string set_id(std::string v);
virtual std::string set_id(std::string v) = 0;
};
// @global User must provides a log object
extern ISrsLog* _srs_log;
// @global User must implements the LogContext and define a global instance.
extern ISrsThreadContext* _srs_context;
extern ISrsContext* _srs_context;
// Log style.
// Use __FUNCTION__ to print c method

View file

@ -682,15 +682,15 @@ srs_error_t SrsRtcpTWCC::encode_chunk_two_bit(SrsRtcpTWCC::SrsRtcpTWCCChunk& chu
pkt_len += sizeof(encoded_chunk);
if (shift) {
chunk.size -= size;
chunk.all_same = true;
chunk.has_large_delta = false;
for (i = size; i < chunk.size; ++i) {
delta_size = chunk.delta_sizes[i];
chunk.delta_sizes[i - size] = delta_size;
for (i = 0; i < chunk.size; ++i) {
delta_size = chunk.delta_sizes[i + size];
chunk.delta_sizes[i] = delta_size;
chunk.all_same = (chunk.all_same && delta_size == chunk.delta_sizes[0]);
chunk.has_large_delta = chunk.has_large_delta || delta_size == kTwccFbLargeRecvDeltaBytes;
}
chunk.size -= size;
}
return srs_success;
@ -773,6 +773,17 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
err = do_encode(buffer);
clear();
return err;
}
srs_error_t SrsRtcpTWCC::do_encode(SrsBuffer *buffer)
{
srs_error_t err = srs_success;
if(!buffer->require(nb_bytes())) {
return srs_error_new(ERROR_RTC_RTCP, "requires %d bytes", nb_bytes());
}
@ -787,85 +798,79 @@ srs_error_t SrsRtcpTWCC::encode(SrsBuffer *buffer)
uint16_t last_sn = base_sn_;
packet_count_ = recv_packes_.size();
do {
// encode chunk
SrsRtcpTWCC::SrsRtcpTWCCChunk chunk;
for(; it_sn != recv_sns_.end(); ++it_sn) {
uint16_t current_sn = *it_sn;
// calculate delta
it_ts = recv_packes_.find(current_sn);
srs_utime_t delta_us = calculate_delta_us(it_ts->second, last_ts);
uint16_t delta = delta_us;
if(delta != delta_us) {
return srs_error_new(ERROR_RTC_RTCP, "twcc: delta:%lld, exceeds the 16-bit base receive delta", delta_us);
}
if(current_sn > (last_sn + 1)) {
// lost packet
for(uint16_t lost_sn = last_sn + 1; lost_sn < current_sn; ++lost_sn) {
process_pkt_chunk(chunk, 0);
packet_count_++;
}
}
// FIXME 24-bit base receive delta not supported
int recv_delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2;
if ((err = process_pkt_chunk(chunk, recv_delta_size)) != srs_success) {
return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta", recv_delta_size);
}
pkt_deltas_.push_back(delta);
last_ts += delta * kTwccFbDeltaUnit;
pkt_len += recv_delta_size;
last_sn = current_sn;
// encode chunk
SrsRtcpTWCC::SrsRtcpTWCCChunk chunk;
for(; it_sn != recv_sns_.end(); ++it_sn) {
uint16_t current_sn = *it_sn;
// calculate delta
it_ts = recv_packes_.find(current_sn);
srs_utime_t delta_us = calculate_delta_us(it_ts->second, last_ts);
int16_t delta = delta_us;
if(delta != delta_us) {
return srs_error_new(ERROR_RTC_RTCP, "twcc: delta:%" PRId64 ", exceeds the 16bits", delta_us);
}
if(0 < chunk.size) {
if((err = encode_remaining_chunk(chunk)) != srs_success) {
return srs_error_wrap(err, "encode chunk");
if(current_sn > (last_sn + 1)) {
// lost packet
for(uint16_t lost_sn = last_sn + 1; lost_sn < current_sn; ++lost_sn) {
process_pkt_chunk(chunk, 0);
packet_count_++;
}
}
// encode rtcp twcc packet
if((pkt_len % 4) == 0) {
header_.length = pkt_len / 4;
// FIXME 24-bit base receive delta not supported
int recv_delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2;
if ((err = process_pkt_chunk(chunk, recv_delta_size)) != srs_success) {
return srs_error_new(ERROR_RTC_RTCP, "delta_size %d, failed to append_recv_delta", recv_delta_size);
}
pkt_deltas_.push_back(delta);
last_ts += delta * kTwccFbDeltaUnit;
pkt_len += recv_delta_size;
last_sn = current_sn;
}
if(0 < chunk.size) {
if((err = encode_remaining_chunk(chunk)) != srs_success) {
return srs_error_wrap(err, "encode chunk");
}
}
// encode rtcp twcc packet
if((pkt_len % 4) == 0) {
header_.length = pkt_len / 4;
} else {
header_.length = (pkt_len + 4 - (pkt_len%4)) / 4;
}
header_.length -= 1;
if(srs_success != (err = encode_header(buffer))) {
return srs_error_wrap(err, "encode header");
}
buffer->write_4bytes(sender_ssrc_);
buffer->write_4bytes(media_ssrc_);
buffer->write_2bytes(base_sn_);
buffer->write_2bytes(packet_count_);
buffer->write_3bytes(reference_time_);
buffer->write_1bytes(fb_pkt_count_);
for(vector<uint16_t>::iterator it = encoded_chucks_.begin(); it != encoded_chucks_.end(); ++it) {
buffer->write_2bytes(*it);
}
for(vector<uint16_t>::iterator it = pkt_deltas_.begin(); it != pkt_deltas_.end(); ++it) {
if(0 <= *it && 0xFF >= *it) {
// small delta
uint8_t delta = *it;
buffer->write_1bytes(delta);
} else {
header_.length = (pkt_len + 4 - (pkt_len%4)) / 4;
}
header_.length -= 1;
if(srs_success != (err = encode_header(buffer))) {
err = srs_error_wrap(err, "encode header");
break;
}
buffer->write_4bytes(sender_ssrc_);
buffer->write_4bytes(media_ssrc_);
buffer->write_2bytes(base_sn_);
buffer->write_2bytes(packet_count_);
buffer->write_3bytes(reference_time_);
buffer->write_1bytes(fb_pkt_count_);
for(vector<uint16_t>::iterator it = encoded_chucks_.begin(); it != encoded_chucks_.end(); ++it) {
// large or negative delta
buffer->write_2bytes(*it);
}
for(vector<uint16_t>::iterator it = pkt_deltas_.begin(); it != pkt_deltas_.end(); ++it) {
if(0 <= *it && 0xFF >= *it) {
// small delta
uint8_t delta = *it;
buffer->write_1bytes(delta);
} else {
// large or negative delta
buffer->write_2bytes(*it);
}
}
while((pkt_len % 4) != 0) {
buffer->write_1bytes(0);
pkt_len++;
}
} while(0);
clear();
}
while((pkt_len % 4) != 0) {
buffer->write_1bytes(0);
pkt_len++;
}
return err;
}

View file

@ -293,7 +293,8 @@ public:
virtual srs_error_t decode(SrsBuffer *buffer);
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer *buffer);
private:
srs_error_t do_encode(SrsBuffer *buffer);
};
class SrsRtcpNack : public SrsRtcpCommon

View file

@ -53,15 +53,15 @@ int32_t srs_seq_distance(uint16_t value, uint16_t pre_value)
return srs_rtp_seq_distance(pre_value, value);
}
SrsRtpHeaderExtensionMap::SrsRtpHeaderExtensionMap()
SrsRtpExtensionTypes::SrsRtpExtensionTypes()
{
}
SrsRtpHeaderExtensionMap::~SrsRtpHeaderExtensionMap()
SrsRtpExtensionTypes::~SrsRtpExtensionTypes()
{
}
bool SrsRtpHeaderExtensionMap::register_by_uri(int id, std::string uri)
bool SrsRtpExtensionTypes::register_by_uri(int id, std::string uri)
{
for (int i = 0; i < (int)sizeof(kExtensions); ++i) {
if (kExtensions[i].uri == uri) {
@ -71,7 +71,7 @@ bool SrsRtpHeaderExtensionMap::register_by_uri(int id, std::string uri)
return false;
}
bool SrsRtpHeaderExtensionMap::register_id(int id, SrsRtpExtensionType type, std::string uri)
bool SrsRtpExtensionTypes::register_id(int id, SrsRtpExtensionType type, std::string uri)
{
if (id < 1 || id > 255) {
return false;
@ -81,7 +81,7 @@ bool SrsRtpHeaderExtensionMap::register_id(int id, SrsRtpExtensionType type, std
return true;
}
SrsRtpExtensionType SrsRtpHeaderExtensionMap::get_type(int id) const
SrsRtpExtensionType SrsRtpExtensionTypes::get_type(int id) const
{
for (int type = kRtpExtensionNone + 1; type < kRtpExtensionNumberOfExtensions; ++type) {
if (ids_[type] == id) {
@ -91,85 +91,241 @@ SrsRtpExtensionType SrsRtpHeaderExtensionMap::get_type(int id) const
return kInvalidType;
}
SrsRtpHeaderExtension::SrsRtpHeaderExtension()
SrsRtpExtensionTwcc::SrsRtpExtensionTwcc(): has_twcc_(false), id_(0), sn_(0)
{
has_transport_sequence_number = false;
transport_sequence_number = 0;
transport_cc_ext_id = 0;
}
SrsRtpHeaderExtension::~SrsRtpHeaderExtension()
SrsRtpExtensionTwcc::~SrsRtpExtensionTwcc()
{
}
bool SrsRtpExtensionTwcc::has_twcc_ext()
{
return has_twcc_;
}
srs_error_t SrsRtpExtensionTwcc::decode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
// 0 1 2
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | L=1 |transport wide sequence number |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
if (!buf->require(1)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1);
}
uint8_t v = buf->read_1bytes();
id_ = (v & 0xF0) >> 4;
uint8_t len = (v & 0x0F);
if(!id_ || len != 1) {
return srs_error_new(ERROR_RTC_RTP, "invalid twcc id=%d, len=%d", id_, len);
}
if (!buf->require(3)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 3);
}
sn_ = buf->read_2bytes();
buf->read_1bytes();
has_twcc_ = true;
return err;
}
int SrsRtpExtensionTwcc::nb_bytes()
{
return 4;
}
srs_error_t SrsRtpExtensionTwcc::encode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
uint8_t id_len = (id_ & 0x0F)<< 4| 0x01;
buf->write_1bytes(id_len);
buf->write_2bytes(sn_);
buf->write_1bytes(0x00);
return err;
}
uint8_t SrsRtpExtensionTwcc::get_id()
{
return id_;
}
void SrsRtpExtensionTwcc::set_id(uint8_t id)
{
id_ = id;
has_twcc_ = true;
}
uint16_t SrsRtpExtensionTwcc::get_sn()
{
return sn_;
}
void SrsRtpExtensionTwcc::set_sn(uint16_t sn)
{
sn_ = sn;
has_twcc_ = true;
}
SrsRtpExtensions::SrsRtpExtensions() : has_ext_(false)
{
}
SrsRtpExtensions::~SrsRtpExtensions()
{
}
srs_error_t SrsRtpExtensions::decode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
/* @see https://tools.ietf.org/html/rfc3550#section-5.3.1
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| defined by profile | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| header extension |
| .... |
*/
if (!buf->require(4)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires 4 bytes");
}
uint16_t profile_id = buf->read_2bytes();
uint16_t extension_length = buf->read_2bytes();
// @see: https://tools.ietf.org/html/rfc5285#section-4.2
if (profile_id == 0xBEDE) {
SrsBuffer xbuf(buf->head(), extension_length * 4);
buf->skip(extension_length * 4);
return decode_0xbede(&xbuf);
} else if (profile_id == 0x1000) {
buf->skip(extension_length * 4);
} else {
return srs_error_new(ERROR_RTC_RTP_MUXER, "fail to parse extension");
}
return err;
}
srs_error_t SrsRtpExtensions::decode_0xbede(SrsBuffer* buf)
{
srs_error_t err = srs_success;
while (!buf->empty()) {
// The first byte maybe padding or id+len.
if (!buf->require(1)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1);
}
uint8_t v = *((uint8_t*)buf->head());
// Padding, ignore
if(v == 0) {
buf->skip(1);
continue;
}
// 0
// 0 1 2 3 4 5 6 7
// +-+-+-+-+-+-+-+-+
// | ID | len |
// +-+-+-+-+-+-+-+-+
// Note that 'len' is the header extension element length, which is the
// number of bytes - 1.
uint8_t id = (v & 0xF0) >> 4;
uint8_t len = (v & 0x0F);
SrsRtpExtensionType xtype = types_.get_type(id);
if (xtype == kRtpExtensionTransportSequenceNumber) {
if(srs_success != (err = twcc_.decode(buf))) {
return srs_error_wrap(err, "decode twcc extension");
}
has_ext_ = true;
} else {
buf->skip(1 + (len + 1));
}
}
return err;
}
int SrsRtpExtensions::nb_bytes()
{
return 4 + (twcc_.has_twcc_ext() ? twcc_.nb_bytes() : 0);
}
srs_error_t SrsRtpExtensions::encode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
buf->write_2bytes(0xBEDE);
int len = 0;
//TODO: When add new rtp extension, it should add the extension length into len
if(twcc_.has_twcc_ext()) {
len += twcc_.nb_bytes();
}
buf->write_2bytes(len / 4);
if(twcc_.has_twcc_ext()) {
if(srs_success != (err = twcc_.encode(buf))) {
return srs_error_wrap(err, "encode twcc extension");
}
}
return err;
}
bool SrsRtpExtensions::exists()
{
return has_ext_;
}
void SrsRtpExtensions::set_types_(const SrsRtpExtensionTypes* types)
{
if(types) {
types_ = *types;
}
}
srs_error_t SrsRtpExtensions::get_twcc_sequence_number(uint16_t& twcc_sn)
{
if(twcc_.has_twcc_ext()) {
twcc_sn = twcc_.get_sn();
return srs_success;
}
return srs_error_new(ERROR_RTC_RTP_MUXER, "not find twcc sequence number");
}
srs_error_t SrsRtpExtensions::set_twcc_sequence_number(uint8_t id, uint16_t sn)
{
has_ext_ = true;
twcc_.set_id(id);
twcc_.set_sn(sn);
return srs_success;
}
SrsRtpHeader::SrsRtpHeader()
{
padding_length = 0;
extension = false;
cc = 0;
marker = false;
payload_type = 0;
sequence = 0;
timestamp = 0;
ssrc = 0;
extension_length = 0;
ignore_padding_ = false;
}
SrsRtpHeader::~SrsRtpHeader()
{
}
srs_error_t SrsRtpHeader::parse_extension(SrsBuffer* buf) {
srs_error_t err = srs_success;
uint16_t profile_id = buf->read_2bytes();
extension_length = buf->read_2bytes();
// @see: https://tools.ietf.org/html/rfc5285#section-4.2
if (profile_id == 0xBEDE) {
uint32_t xlen = extension_length * 4;
while (xlen > 0) {
// parse id and len
uint8_t id_len = buf->read_1bytes();
xlen--;
if(id_len == 0) {
// padding, ignore
continue;
}
// 0
// 0 1 2 3 4 5 6 7
// +-+-+-+-+-+-+-+-+
// | ID | len |
// +-+-+-+-+-+-+-+-+
// Note that 'len' is the header extension element length, which is the
// number of bytes - 1.
uint8_t id = (id_len & 0xF0) >> 4;
uint8_t len = (id_len & 0x0F);
SrsRtpExtensionType xtype = extension_map_.get_type(id);
if (xtype == kRtpExtensionTransportSequenceNumber) {
// 0 1 2
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | L=1 |transport wide sequence number |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
header_extension.has_transport_sequence_number = true;
header_extension.transport_sequence_number = buf->read_2bytes();
header_extension.transport_cc_ext_id = id;
xlen -= 2;
} else {
buf->skip(len + 1);
xlen -= len + 1;
}
}
} else if (profile_id == 0x1000) {
buf->skip(extension_length * 4);
} else {
return srs_error_new(ERROR_RTC_RTP_MUXER, "fail to parse extension");
}
return err;
}
srs_error_t SrsRtpHeader::decode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
@ -178,7 +334,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* buf)
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d+ bytes", kRtpHeaderFixedSize);
}
/*
/* @see https://tools.ietf.org/html/rfc1889#section-5.1
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@ -195,7 +351,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* buf)
uint8_t first = buf->read_1bytes();
bool padding = (first & 0x20);
extension = (first & 0x10);
bool extension = (first & 0x10);
cc = (first & 0x0F);
uint8_t second = buf->read_1bytes();
@ -216,21 +372,12 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* buf)
}
if (extension) {
/* RTP header extension, RFC 3550.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| defined by profile | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| header extension |
| .... |
*/
if ((err = parse_extension(buf)) != srs_success) {
if ((err = parse_extensions(buf)) != srs_success) {
return srs_error_wrap(err, "fail to parse extension");
}
}
if (padding && !buf->empty()) {
if (padding && !ignore_padding_ && !buf->empty()) {
padding_length = *(reinterpret_cast<uint8_t*>(buf->data() + buf->size() - 1));
if (!buf->require(padding_length)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "padding requires %d bytes", padding_length);
@ -240,6 +387,16 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* buf)
return err;
}
srs_error_t SrsRtpHeader::parse_extensions(SrsBuffer* buf) {
srs_error_t err = srs_success;
if(srs_success != (err = extensions_.decode(buf))) {
return srs_error_wrap(err, "decode rtp extension");
}
return err;
}
srs_error_t SrsRtpHeader::encode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
@ -251,7 +408,7 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* buf)
if (padding_length > 0) {
v |= 0x20;
}
if (extension) {
if (extensions_.exists()) {
v |= 0x10;
}
buf->write_1bytes(v);
@ -277,40 +434,42 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* buf)
buf->write_4bytes(csrc[i]);
}
if (extension) {
buf->write_2bytes(0xBEDE);
// TODO: FIXME: extension_length should caculate by extension length
buf->write_2bytes(extension_length);
if (header_extension.has_transport_sequence_number) {
uint8_t id_len = (header_extension.transport_cc_ext_id & 0x0F)<< 4| 0x01;
buf->write_1bytes(id_len);
buf->write_2bytes(header_extension.transport_sequence_number);
buf->write_1bytes(0x00);
if (extensions_.exists()) {
if(srs_success != (err = extensions_.encode(buf))) {
return srs_error_wrap(err, "encode rtp extension");
}
}
return err;
}
void SrsRtpHeader::set_extensions(const SrsRtpHeaderExtensionMap* extmap)
void SrsRtpHeader::set_extensions(const SrsRtpExtensionTypes* extmap)
{
if (extmap) {
extension_map_ = *extmap;
extensions_.set_types_(extmap);
}
}
void SrsRtpHeader::ignore_padding(bool v)
{
ignore_padding_ = v;
}
srs_error_t SrsRtpHeader::get_twcc_sequence_number(uint16_t& twcc_sn)
{
if (header_extension.has_transport_sequence_number == true) {
twcc_sn = header_extension.transport_sequence_number;
return srs_success;
if (extensions_.exists()) {
return extensions_.get_twcc_sequence_number(twcc_sn);
}
return srs_error_new(ERROR_RTC_RTP_MUXER, "not find twcc sequence number");
return srs_error_new(ERROR_RTC_RTP_MUXER, "no rtp extension");
}
srs_error_t SrsRtpHeader::set_twcc_sequence_number(uint8_t id, uint16_t sn)
{
return extensions_.set_twcc_sequence_number(id, sn);
}
int SrsRtpHeader::nb_bytes()
{
return kRtpHeaderFixedSize + cc * 4 + (extension ? (extension_length + 1) * 4 : 0);
return kRtpHeaderFixedSize + cc * 4 + (extensions_.exists() ? extensions_.nb_bytes() : 0);
}
void SrsRtpHeader::set_marker(bool v)
@ -449,9 +608,9 @@ SrsRtpPacket2* SrsRtpPacket2::copy()
return cp;
}
void SrsRtpPacket2::set_rtp_header_extensions(const SrsRtpHeaderExtensionMap* extmap)
void SrsRtpPacket2::set_extension_types(const SrsRtpExtensionTypes* v)
{
return header.set_extensions(extmap);
return header.set_extensions(v);
}
int SrsRtpPacket2::nb_bytes()

View file

@ -99,7 +99,7 @@ const SrsExtensionInfo kExtensions[] = {
{kRtpExtensionTransportSequenceNumber, std::string("http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01")}
};
class SrsRtpHeaderExtensionMap
class SrsRtpExtensionTypes
{
public:
static const SrsRtpExtensionType kInvalidType = kRtpExtensionNone;
@ -108,30 +108,65 @@ public:
bool register_by_uri(int id, std::string uri);
SrsRtpExtensionType get_type(int id) const;
public:
SrsRtpHeaderExtensionMap();
virtual ~SrsRtpHeaderExtensionMap();
SrsRtpExtensionTypes();
virtual ~SrsRtpExtensionTypes();
private:
bool register_id(int id, SrsRtpExtensionType type, std::string uri);
private:
uint8_t ids_[kRtpExtensionNumberOfExtensions];
};
class SrsRtpHeaderExtension
class SrsRtpExtensionTwcc : public ISrsCodec
{
bool has_twcc_;
uint8_t id_;
uint16_t sn_;
public:
bool has_transport_sequence_number;
uint16_t transport_sequence_number;
uint8_t transport_cc_ext_id;
public:
SrsRtpHeaderExtension();
virtual ~SrsRtpHeaderExtension();
SrsRtpExtensionTwcc();
virtual ~SrsRtpExtensionTwcc();
bool has_twcc_ext();
uint8_t get_id();
void set_id(uint8_t id);
uint16_t get_sn();
void set_sn(uint16_t sn);
public:
// ISrsCodec
virtual srs_error_t decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual int nb_bytes();
};
class SrsRtpHeader
class SrsRtpExtensions : public ISrsCodec
{
private:
bool has_ext_;
SrsRtpExtensionTypes types_;
SrsRtpExtensionTwcc twcc_;
public:
SrsRtpExtensions();
virtual ~SrsRtpExtensions();
bool exists();
void set_types_(const SrsRtpExtensionTypes* types);
srs_error_t get_twcc_sequence_number(uint16_t& twcc_sn);
srs_error_t set_twcc_sequence_number(uint8_t id, uint16_t sn);
// ISrsCodec
public:
virtual srs_error_t decode(SrsBuffer* buf);
private:
srs_error_t decode_0xbede(SrsBuffer* buf);
public:
virtual srs_error_t encode(SrsBuffer* buf);
virtual int nb_bytes();
};
class SrsRtpHeader : public ISrsCodec
{
private:
uint8_t padding_length;
bool extension;
uint8_t cc;
bool marker;
uint8_t payload_type;
@ -139,16 +174,16 @@ private:
uint32_t timestamp;
uint32_t ssrc;
uint32_t csrc[15];
uint16_t extension_length;
SrsRtpHeaderExtensionMap extension_map_;
SrsRtpHeaderExtension header_extension;
SrsRtpExtensions extensions_;
bool ignore_padding_;
public:
SrsRtpHeader();
virtual ~SrsRtpHeader();
private:
srs_error_t parse_extension(SrsBuffer* buf);
public:
virtual srs_error_t decode(SrsBuffer* buf);
private:
srs_error_t parse_extensions(SrsBuffer* buf);
public:
virtual srs_error_t encode(SrsBuffer* buf);
virtual int nb_bytes();
public:
@ -164,8 +199,10 @@ public:
uint32_t get_ssrc() const;
void set_padding(uint8_t v);
uint8_t get_padding() const;
void set_extensions(const SrsRtpHeaderExtensionMap* extmap);
void set_extensions(const SrsRtpExtensionTypes* extmap);
void ignore_padding(bool v);
srs_error_t get_twcc_sequence_number(uint16_t& twcc_sn);
srs_error_t set_twcc_sequence_number(uint8_t id, uint16_t sn);
};
class ISrsRtpPayloader : public ISrsCodec
@ -222,7 +259,7 @@ public:
// Copy the RTP packet.
SrsRtpPacket2* copy();
// Set RTP header extensions for encoding or decoding header extension
void set_rtp_header_extensions(const SrsRtpHeaderExtensionMap* extmap);
void set_extension_types(const SrsRtpExtensionTypes* v);
// interface ISrsEncoder
public:
virtual int nb_bytes();

View file

@ -52,7 +52,7 @@ srs_error_t proxy_hls2rtmp(std::string hls, std::string rtmp);
// @global log and context.
ISrsLog* _srs_log = new SrsConsoleLog(SrsLogLevelTrace, false);
ISrsThreadContext* _srs_context = new SrsThreadContext();
ISrsContext* _srs_context = new SrsThreadContext();
/**
* main entrance.

View file

@ -38,7 +38,7 @@ using namespace std;
// @global log and context.
ISrsLog* _srs_log = new SrsConsoleLog(SrsLogLevelTrace, false);
ISrsThreadContext* _srs_context = new SrsThreadContext();
ISrsContext* _srs_context = new SrsThreadContext();
srs_error_t parse(std::string mp4_file, bool verbose)
{

View file

@ -69,8 +69,8 @@ srs_error_t run_hybrid_server();
void show_macro_features();
// @global log and context.
ISrsLog* _srs_log = new SrsFastLog();
ISrsThreadContext* _srs_context = new SrsThreadContext();
ISrsLog* _srs_log = new SrsFileLog();
ISrsContext* _srs_context = new SrsThreadContext();
// @global config object for app module.
SrsConfig* _srs_config = new SrsConfig();

View file

@ -449,15 +449,11 @@ public:
//
// There are some modes to determine the length of body:
// 1. content-length and chunked.
// 2. user confirmed infinite chunked.
// 3. no body or user not confirmed infinite chunked.
// 2. infinite chunked.
// 3. no body.
// For example:
// ISrsHttpMessage* r = ...;
// while (!r->eof()) r->read(); // Read in mode 1 or 3.
// For some server, we can confirm the body is infinite chunked:
// ISrsHttpMessage* r = ...;
// r->enter_infinite_chunked();
// while (!r->eof()) r->read(); // Read in mode 2
// @rmark for mode 2, the infinite chunked, all left data is body.
class ISrsHttpMessage
{
@ -492,10 +488,6 @@ public:
// @return the REST id; -1 if not matched.
virtual std::string parse_rest_id(std::string pattern) = 0;
public:
// The left all data is chunked body, the infinite chunked mode,
// which is chunked encoding without chunked header.
// @remark error when message is in chunked or content-length specified.
virtual srs_error_t enter_infinite_chunked() = 0;
// Read body to string.
// @remark for small http body.
virtual srs_error_t body_read_all(std::string& body) = 0;

View file

@ -291,7 +291,6 @@ SrsHttpMessage::SrsHttpMessage(ISrsReader* reader, SrsFastStream* buffer) : ISrs
{
owner_conn = NULL;
chunked = false;
infinite_chunked = false;
_uri = new SrsHttpUri();
_body = new SrsHttpResponseReader(this, reader, buffer);
@ -335,6 +334,12 @@ void SrsHttpMessage::set_header(SrsHttpHeader* header, bool keep_alive)
if (!clv.empty()) {
_content_length = ::atoll(clv.c_str());
}
// If method is OPTIONS, and no size(content-length or chunked), it's not infinite chunked,
// it means there is no body, so we must close the body reader.
if (_method == SRS_CONSTS_HTTP_OPTIONS && !chunked && _content_length == -1) {
_body->close();
}
}
srs_error_t SrsHttpMessage::set_url(string url, bool allow_jsonp)
@ -476,11 +481,6 @@ bool SrsHttpMessage::is_keep_alive()
return _keep_alive;
}
bool SrsHttpMessage::is_infinite_chunked()
{
return infinite_chunked;
}
string SrsHttpMessage::uri()
{
std::string uri = _uri->get_schema();
@ -550,23 +550,6 @@ std::string SrsHttpMessage::parse_rest_id(string pattern)
return "";
}
srs_error_t SrsHttpMessage::enter_infinite_chunked()
{
srs_error_t err = srs_success;
if (infinite_chunked) {
return err;
}
if (is_chunked() || content_length() != -1) {
return srs_error_new(ERROR_HTTP_DATA_INVALID, "not infinited chunked");
}
infinite_chunked = true;
return err;
}
srs_error_t SrsHttpMessage::body_read_all(string& body)
{
srs_error_t err = srs_success;
@ -944,6 +927,11 @@ SrsHttpResponseReader::~SrsHttpResponseReader()
{
}
void SrsHttpResponseReader::close()
{
is_eof = true;
}
bool SrsHttpResponseReader::eof()
{
return is_eof;
@ -975,16 +963,17 @@ srs_error_t SrsHttpResponseReader::read(void* data, size_t nb_data, ssize_t* nb_
return read_specified(data, nb_data, nb_read);
}
// infinite chunked mode, directly read.
if (owner->is_infinite_chunked()) {
srs_assert(!owner->is_chunked() && owner->content_length() == -1);
return read_specified(data, nb_data, nb_read);
// Infinite chunked mode.
// If not chunked encoding, and no content-length, it's infinite chunked.
// In this mode, all body is data and never EOF util socket closed.
if ((err = read_specified(data, nb_data, nb_read)) != srs_success) {
// For infinite chunked, the socket close event is EOF.
if (srs_error_code(err) == ERROR_SOCKET_READ) {
srs_freep(err); is_eof = true;
return err;
}
}
// infinite chunked mode, but user not set it,
// we think there is no data left.
is_eof = true;
return err;
}

View file

@ -99,8 +99,6 @@ private:
// The body object, reader object.
// @remark, user can get body in string by get_body().
SrsHttpResponseReader* _body;
// Whether the body is infinite chunked.
bool infinite_chunked;
// Use a buffer to read and send ts file.
// The transport connection, can be NULL.
ISrsConnection* owner_conn;
@ -157,9 +155,6 @@ public:
virtual bool is_http_options();
// Whether body is chunked encoding, for reader only.
virtual bool is_chunked();
// Whether body is infinite chunked encoding.
// @remark set by enter_infinite_chunked.
virtual bool is_infinite_chunked();
// Whether should keep the connection alive.
virtual bool is_keep_alive();
// The uri contains the host and path.
@ -173,8 +168,6 @@ public:
virtual std::string ext();
// Get the RESTful matched id.
virtual std::string parse_rest_id(std::string pattern);
public:
virtual srs_error_t enter_infinite_chunked();
public:
// Read body to string.
// @remark for small http body.
@ -271,6 +264,11 @@ public:
// while buffer is a fast cache which may have cached some data from reader.
SrsHttpResponseReader(SrsHttpMessage* msg, ISrsReader* reader, SrsFastStream* buffer);
virtual ~SrsHttpResponseReader();
public:
// User close the HTTP response reader.
// For example, OPTIONS has no body, no content-length and not chunked,
// so we must close it(set to eof) to avoid reading the response body.
void close();
// Interface ISrsHttpResponseReader
public:
virtual bool eof();

View file

@ -34,7 +34,7 @@
// The st thread context, get_id will get the st-thread id,
// which identify the client.
class SrsThreadContext : public ISrsThreadContext
class SrsThreadContext : public ISrsContext
{
private:
std::map<srs_thread_t, std::string> cache;

View file

@ -57,6 +57,7 @@ SrsBasicRtmpClient::~SrsBasicRtmpClient()
close();
srs_freep(kbps);
srs_freep(clk);
srs_freep(req);
}
srs_error_t SrsBasicRtmpClient::connect()

View file

@ -41,7 +41,7 @@ srs_utime_t _srs_tmp_timeout = (100 * SRS_UTIME_MILLISECONDS);
// kernel module.
ISrsLog* _srs_log = new MockEmptyLog(SrsLogLevelDisabled);
ISrsThreadContext* _srs_context = new ISrsThreadContext();
ISrsContext* _srs_context = new SrsThreadContext();
// app module.
SrsConfig* _srs_config = NULL;
SrsServer* _srs_server = NULL;

View file

@ -67,7 +67,7 @@ extern srs_utime_t _srs_tmp_timeout;
// For init array data.
#define HELPER_ARRAY_INIT(buf, sz, val) \
for (int i = 0; i < (int)sz; i++) (buf)[i]=val
memset(buf, val, sz)
// Dump simple stream to string.
#define HELPER_BUFFER2STR(io) \
@ -95,7 +95,7 @@ extern srs_utime_t _srs_tmp_timeout;
// print the bytes.
void srs_bytes_print(char* pa, int size);
class MockEmptyLog : public SrsFastLog
class MockEmptyLog : public SrsFileLog
{
public:
MockEmptyLog(SrsLogLevel l);

View file

@ -4090,28 +4090,6 @@ VOID TEST(KernelFLVTest, CoverSharedPtrMessage)
}
}
VOID TEST(KernelLogTest, CoverAll)
{
srs_error_t err;
if (true) {
ISrsLog l;
HELPER_EXPECT_SUCCESS(l.initialize());
l.reopen();
l.verbose("TAG", "0", "log");
l.info("TAG", "0", "log");
l.trace("TAG", "0", "log");
l.warn("TAG", "0", "log");
l.error("TAG", "0", "log");
ISrsThreadContext ctx;
ctx.set_id("10");
EXPECT_EQ("", ctx.get_id());
EXPECT_EQ("", ctx.generate_id());
}
}
VOID TEST(KernelMp3Test, CoverAll)
{
srs_error_t err;

View file

@ -480,7 +480,7 @@ VOID TEST(TCPServerTest, WritevIOVC)
}
}
VOID TEST(TCPServerTest, MessageConnection)
VOID TEST(HTTPServerTest, MessageConnection)
{
srs_error_t err;
@ -534,7 +534,6 @@ VOID TEST(TCPServerTest, MessageConnection)
if (true) {
SrsHttpMessage m;
EXPECT_TRUE(m.is_keep_alive());
EXPECT_FALSE(m.is_infinite_chunked());
}
if (true) {
@ -562,42 +561,7 @@ VOID TEST(TCPServerTest, MessageConnection)
}
}
VOID TEST(TCPServerTest, MessageInfinityChunked)
{
srs_error_t err;
if (true) {
SrsHttpMessage m;
EXPECT_FALSE(m.is_infinite_chunked());
HELPER_EXPECT_SUCCESS(m.enter_infinite_chunked());
EXPECT_TRUE(m.is_infinite_chunked());
}
if (true) {
SrsHttpMessage m;
HELPER_EXPECT_SUCCESS(m.enter_infinite_chunked());
HELPER_EXPECT_SUCCESS(m.enter_infinite_chunked());
EXPECT_TRUE(m.is_infinite_chunked());
}
if (true) {
SrsHttpMessage m;
SrsHttpHeader hdr;
hdr.set("Transfer-Encoding", "chunked");
m.set_header(&hdr, false);
HELPER_EXPECT_FAILED(m.enter_infinite_chunked());
}
if (true) {
SrsHttpMessage m;
SrsHttpHeader hdr;
hdr.set("Content-Length", "100");
m.set_header(&hdr, false);
HELPER_EXPECT_FAILED(m.enter_infinite_chunked());
}
}
VOID TEST(TCPServerTest, MessageTurnRequest)
VOID TEST(HTTPServerTest, MessageTurnRequest)
{
srs_error_t err;
@ -645,7 +609,63 @@ VOID TEST(TCPServerTest, MessageTurnRequest)
}
}
VOID TEST(TCPServerTest, MessageWritev)
VOID TEST(HTTPServerTest, ContentLength)
{
srs_error_t err;
if (true) {
MockBufferIO io;
io.append("HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\n");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false));
ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg));
char buf[32]; ssize_t nread = 0;
ISrsHttpResponseReader* r = msg->body_reader();
io.append("Hello");
HELPER_ARRAY_INIT(buf, sizeof(buf), 0);
HELPER_ASSERT_SUCCESS(r->read(buf, 5, &nread));
EXPECT_EQ(5, nread);
EXPECT_STREQ("Hello", buf);
io.append("World!");
HELPER_ARRAY_INIT(buf, sizeof(buf), 0);
HELPER_ASSERT_SUCCESS(r->read(buf, 6, &nread));
EXPECT_EQ(6, nread);
EXPECT_STREQ("World!", buf);
}
}
VOID TEST(HTTPServerTest, HTTPChunked)
{
srs_error_t err;
if (true) {
MockBufferIO io;
io.append("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false));
ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg));
char buf[32]; ssize_t nread = 0;
ISrsHttpResponseReader* r = msg->body_reader();
io.append("5\r\nHello\r\n");
HELPER_ARRAY_INIT(buf, sizeof(buf), 0);
HELPER_ASSERT_SUCCESS(r->read(buf, 5, &nread));
EXPECT_EQ(5, nread);
EXPECT_STREQ("Hello", buf);
io.append("6\r\nWorld!\r\n");
HELPER_ARRAY_INIT(buf, sizeof(buf), 0);
HELPER_ASSERT_SUCCESS(r->read(buf, 6, &nread));
EXPECT_EQ(6, nread);
EXPECT_STREQ("World!", buf);
}
}
VOID TEST(HTTPServerTest, InfiniteChunked)
{
srs_error_t err;
@ -656,12 +676,7 @@ VOID TEST(TCPServerTest, MessageWritev)
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false));
ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg));
if (true) {
SrsHttpMessage* hm = dynamic_cast<SrsHttpMessage*>(msg);
ASSERT_TRUE(hm != NULL);
hm->enter_infinite_chunked();
}
SrsAutoFree(ISrsHttpMessage, msg);
char buf[32]; ssize_t nread = 0;
ISrsHttpResponseReader* r = msg->body_reader();
@ -677,8 +692,96 @@ VOID TEST(TCPServerTest, MessageWritev)
HELPER_ASSERT_SUCCESS(r->read(buf, 8, &nread));
EXPECT_EQ(8, nread);
EXPECT_STREQ("\r\nWorld!", buf);
EXPECT_FALSE(r->eof());
}
// If read error, it's EOF.
if (true) {
MockBufferIO io;
io.append("HTTP/1.1 200 OK\r\n\r\n");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false));
ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg));
char buf[32]; ssize_t nread = 0;
ISrsHttpResponseReader* r = msg->body_reader();
io.append("Hello");
HELPER_ARRAY_INIT(buf, sizeof(buf), 0);
HELPER_ASSERT_SUCCESS(r->read(buf, 10, &nread));
EXPECT_EQ(5, nread);
EXPECT_STREQ("Hello", buf);
io.in_err = srs_error_new(ERROR_SOCKET_READ, "EOF");
HELPER_ASSERT_SUCCESS(r->read(buf, 10, &nread));
EXPECT_TRUE(r->eof());
}
}
VOID TEST(HTTPServerTest, OPTIONSRead)
{
srs_error_t err;
// If OPTIONS, it has no content-length, not chunkted, but not infinite chunked,
// instead, it has no body.
if (true) {
MockBufferIO io;
io.append("OPTIONS /rtc/v1/play HTTP/1.1\r\n\r\n");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &req));
SrsAutoFree(ISrsHttpMessage, req);
ISrsHttpResponseReader* br = req->body_reader();
EXPECT_TRUE(br->eof());
}
// So if OPTIONS has body, with chunked or content-length, it's ok to parsing it.
if (true) {
MockBufferIO io;
io.append("OPTIONS /rtc/v1/play HTTP/1.1\r\nContent-Length: 5\r\n\r\nHello");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &req));
SrsAutoFree(ISrsHttpMessage, req);
ISrsHttpResponseReader* br = req->body_reader();
EXPECT_FALSE(br->eof());
string b; HELPER_ASSERT_SUCCESS(req->body_read_all(b));
EXPECT_STREQ("Hello", b.c_str());
// The body will use as next HTTP request message.
io.append("GET /rtc/v1/play HTTP/1.1\r\n\r\n");
ISrsHttpMessage* req2 = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &req2));
SrsAutoFree(ISrsHttpMessage, req2);
}
// So if OPTIONS has body, but not specified the size, we think it has no body,
// and the body is parsed fail as the next parsing.
if (true) {
MockBufferIO io;
io.append("OPTIONS /rtc/v1/play HTTP/1.1\r\n\r\n");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &req));
SrsAutoFree(ISrsHttpMessage, req);
ISrsHttpResponseReader* br = req->body_reader();
EXPECT_TRUE(br->eof());
// The body will use as next HTTP request message.
io.append("Hello");
ISrsHttpMessage* req2 = NULL; HELPER_ASSERT_FAILED(hp.parse_message(&io, &req2));
SrsAutoFree(ISrsHttpMessage, req2);
}
}
VOID TEST(HTTPServerTest, MessageWritev)
{
srs_error_t err;
// Directly writev, merge to one chunk.
if (true) {
MockResponseWriter w;
@ -1000,8 +1103,6 @@ VOID TEST(TCPServerTest, CoverUtility)
EXPECT_FALSE(srs_net_device_is_internet((sockaddr*)r->ai_addr));
}
EXPECT_FALSE(srs_net_device_is_internet("eth0"));
if (true) {
sockaddr_in addr;
addr.sin_family = AF_INET;
@ -1176,7 +1277,7 @@ public:
}
};
VOID TEST(TCPServerTest, HTTPClientUtility)
VOID TEST(HTTPClientTest, HTTPClientUtility)
{
srs_error_t err;
@ -1278,6 +1379,7 @@ VOID TEST(TCPServerTest, ContextUtility)
int base_size = 0;
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, true, true, "SRS", "100", "Trace", &size));
base_size = size;
@ -1285,6 +1387,7 @@ VOID TEST(TCPServerTest, ContextUtility)
}
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, false, true, "SRS", "100", "Trace", &size));
EXPECT_EQ(base_size, size);
@ -1298,6 +1401,7 @@ VOID TEST(TCPServerTest, ContextUtility)
}
if (true) {
errno = 0;
int size = 0; char buf[1024]; HELPER_ARRAY_INIT(buf, 1024, 0);
ASSERT_TRUE(srs_log_header(buf, 1024, false, false, NULL, "100", "Trace", &size));
EXPECT_EQ(base_size - 8, size);