1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge branch 'feature/rtc' into develop

This commit is contained in:
winlin 2020-03-22 18:53:39 +08:00
commit caf7e9e6ea
771 changed files with 201217 additions and 105 deletions

View file

@ -0,0 +1,469 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_kernel_codec.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_audio_recode.hpp>
static const int kOpusPacketMs = 20;
static const int kOpusMaxbytes = 8000;
static const int kFrameBufMax = 40960;
static const int kPacketBufMax = 8192;
static const int kPcmBufMax = 4096*4;
SrsAudioDecoder::SrsAudioDecoder(std::string codec)
: codec_name_(codec)
{
frame_ = NULL;
packet_ = NULL;
codec_ctx_ = NULL;
}
SrsAudioDecoder::~SrsAudioDecoder()
{
if (codec_ctx_) {
avcodec_free_context(&codec_ctx_);
codec_ctx_ = NULL;
}
if (frame_) {
av_frame_free(&frame_);
frame_ = NULL;
}
if (packet_) {
av_packet_free(&packet_);
packet_ = NULL;
}
}
srs_error_t SrsAudioDecoder::initialize()
{
srs_error_t err = srs_success;
if (codec_name_.compare("aac")) {
return srs_error_wrap(err, "Invalid codec name");
}
const AVCodec *codec = avcodec_find_decoder_by_name(codec_name_.c_str());
if (!codec) {
return srs_error_wrap(err, "Codec not found by name");
}
codec_ctx_ = avcodec_alloc_context3(codec);
if (!codec_ctx_) {
return srs_error_wrap(err, "Could not allocate audio codec context");
}
if (avcodec_open2(codec_ctx_, codec, NULL) < 0) {
return srs_error_wrap(err, "Could not open codec");
}
frame_ = av_frame_alloc();
if (!frame_) {
return srs_error_wrap(err, "Could not allocate audio frame");
}
packet_ = av_packet_alloc();
if (!packet_) {
return srs_error_wrap(err, "Could not allocate audio packet");
}
return err;
}
srs_error_t SrsAudioDecoder::decode(SrsSample *pkt, char *buf, int &size)
{
srs_error_t err = srs_success;
packet_->data = (uint8_t *)pkt->bytes;
packet_->size = pkt->size;
int ret = avcodec_send_packet(codec_ctx_, packet_);
if (ret < 0) {
return srs_error_wrap(err, "Error submitting the packet to the decoder");
}
int max = size;
size = 0;
while (ret >= 0) {
ret = avcodec_receive_frame(codec_ctx_, frame_);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return err;
} else if (ret < 0) {
return srs_error_wrap(err, "Error during decoding");
}
int pcm_size = av_get_bytes_per_sample(codec_ctx_->sample_fmt);
if (pcm_size < 0) {
return srs_error_wrap(err, "Failed to calculate data size");
}
for (int i = 0; i < frame_->nb_samples; i++) {
if (size + pcm_size * codec_ctx_->channels <= max) {
memcpy(buf + size,frame_->data[0] + pcm_size*codec_ctx_->channels * i, pcm_size * codec_ctx_->channels);
size += pcm_size * codec_ctx_->channels;
}
}
}
return err;
}
AVCodecContext* SrsAudioDecoder::codec_ctx()
{
return codec_ctx_;
}
SrsAudioEncoder::SrsAudioEncoder(int samplerate, int channels, int fec, int complexity)
: inband_fec_(fec),
channels_(channels),
sampling_rate_(samplerate),
complexity_(complexity)
{
opus_ = NULL;
}
SrsAudioEncoder::~SrsAudioEncoder()
{
if (opus_) {
opus_encoder_destroy(opus_);
opus_ = NULL;
}
}
srs_error_t SrsAudioEncoder::initialize()
{
srs_error_t err = srs_success;
int error = 0;
opus_ = opus_encoder_create(sampling_rate_, channels_, OPUS_APPLICATION_VOIP, &error);
if (error != OPUS_OK) {
return srs_error_wrap(err, "Error create Opus encoder");
}
switch (sampling_rate_)
{
case 48000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND));
break;
case 24000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_SUPERWIDEBAND));
case 16000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_WIDEBAND));
break;
case 12000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_MEDIUMBAND));
break;
case 8000:
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_NARROWBAND));
break;
default:
sampling_rate_ = 16000;
opus_encoder_ctl(opus_, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_WIDEBAND));
break;
}
opus_encoder_ctl(opus_, OPUS_SET_INBAND_FEC(inband_fec_));
opus_encoder_ctl(opus_, OPUS_SET_COMPLEXITY(complexity_));
return err;
}
srs_error_t SrsAudioEncoder::encode(SrsSample *frame, char *buf, int &size)
{
srs_error_t err = srs_success;
int nb_samples = sampling_rate_ * kOpusPacketMs / 1000;
if (frame->size != nb_samples * 2 * channels_) {
return srs_error_wrap(err, "invalid frame size %d, should be %d", frame->size, nb_samples * 2 * channels_);
}
opus_int16 *data = (opus_int16 *)frame->bytes;
size = opus_encode(opus_, data, nb_samples, (unsigned char *)buf, kOpusMaxbytes);
return err;
}
SrsAudioResample::SrsAudioResample(int src_rate, int src_layout, enum AVSampleFormat src_fmt,
int src_nb, int dst_rate, int dst_layout, enum AVSampleFormat dst_fmt)
: src_rate_(src_rate),
src_ch_layout_(src_layout),
src_sample_fmt_(src_fmt),
src_nb_samples_(src_nb),
dst_rate_(dst_rate),
dst_ch_layout_(dst_layout),
dst_sample_fmt_(dst_fmt)
{
src_nb_channels_ = 0;
dst_nb_channels_ = 0;
src_linesize_ = 0;
dst_linesize_ = 0;
dst_nb_samples_ = 0;
src_data_ = NULL;
dst_data_ = 0;
max_dst_nb_samples_ = 0;
swr_ctx_ = NULL;
}
SrsAudioResample::~SrsAudioResample()
{
if (src_data_) {
av_freep(&src_data_[0]);
av_freep(&src_data_);
src_data_ = NULL;
}
if (dst_data_) {
av_freep(&dst_data_[0]);
av_freep(&dst_data_);
dst_data_ = NULL;
}
if (swr_ctx_) {
swr_free(&swr_ctx_);
swr_ctx_ = NULL;
}
}
srs_error_t SrsAudioResample::initialize()
{
srs_error_t err = srs_success;
swr_ctx_ = swr_alloc();
if (!swr_ctx_) {
return srs_error_wrap(err, "Could not allocate resampler context");
}
av_opt_set_int(swr_ctx_, "in_channel_layout", src_ch_layout_, 0);
av_opt_set_int(swr_ctx_, "in_sample_rate", src_rate_, 0);
av_opt_set_sample_fmt(swr_ctx_, "in_sample_fmt", src_sample_fmt_, 0);
av_opt_set_int(swr_ctx_, "out_channel_layout", dst_ch_layout_, 0);
av_opt_set_int(swr_ctx_, "out_sample_rate", dst_rate_, 0);
av_opt_set_sample_fmt(swr_ctx_, "out_sample_fmt", dst_sample_fmt_, 0);
int ret;
if ((ret = swr_init(swr_ctx_)) < 0) {
return srs_error_wrap(err, "Failed to initialize the resampling context");
}
src_nb_channels_ = av_get_channel_layout_nb_channels(src_ch_layout_);
ret = av_samples_alloc_array_and_samples(&src_data_, &src_linesize_, src_nb_channels_,
src_nb_samples_, src_sample_fmt_, 0);
if (ret < 0) {
return srs_error_wrap(err, "Could not allocate source samples");
}
max_dst_nb_samples_ = dst_nb_samples_ =
av_rescale_rnd(src_nb_samples_, dst_rate_, src_rate_, AV_ROUND_UP);
dst_nb_channels_ = av_get_channel_layout_nb_channels(dst_ch_layout_);
ret = av_samples_alloc_array_and_samples(&dst_data_, &dst_linesize_, dst_nb_channels_,
dst_nb_samples_, dst_sample_fmt_, 0);
if (ret < 0) {
return srs_error_wrap(err, "Could not allocate destination samples");
}
return err;
}
srs_error_t SrsAudioResample::resample(SrsSample *pcm, char *buf, int &size)
{
srs_error_t err = srs_success;
int ret, plane = 1;
if (src_sample_fmt_ == AV_SAMPLE_FMT_FLTP) {
plane = 2;
}
if (src_linesize_ * plane < pcm->size || pcm->size < 0) {
return srs_error_wrap(err, "size not ok");
}
memcpy(src_data_[0], pcm->bytes, pcm->size);
dst_nb_samples_ = av_rescale_rnd(swr_get_delay(swr_ctx_, src_rate_) +
src_nb_samples_, dst_rate_, src_rate_, AV_ROUND_UP);
if (dst_nb_samples_ > max_dst_nb_samples_) {
av_freep(&dst_data_[0]);
ret = av_samples_alloc(dst_data_, &dst_linesize_, dst_nb_channels_,
dst_nb_samples_, dst_sample_fmt_, 1);
if (ret < 0) {
return srs_error_wrap(err, "alloc error");
}
max_dst_nb_samples_ = dst_nb_samples_;
}
ret = swr_convert(swr_ctx_, dst_data_, dst_nb_samples_, (const uint8_t **)src_data_, src_nb_samples_);
if (ret < 0) {
return srs_error_wrap(err, "Error while converting");
}
int dst_bufsize = av_samples_get_buffer_size(&dst_linesize_, dst_nb_channels_,
ret, dst_sample_fmt_, 1);
if (dst_bufsize < 0) {
return srs_error_wrap(err, "Could not get sample buffer size");
}
int max = size;
size = 0;
if (max > dst_bufsize) {
memcpy(buf, dst_data_[0], dst_bufsize);
size = dst_bufsize;
}
return err;
}
SrsAudioRecode::SrsAudioRecode(int channels, int samplerate)
: dst_channels_(channels),
dst_samplerate_(samplerate)
{
size_ = 0;
data_ = new char[kPcmBufMax];
}
SrsAudioRecode::~SrsAudioRecode()
{
if (dec_) {
delete dec_;
dec_ = NULL;
}
if (enc_) {
delete enc_;
enc_ = NULL;
}
if (resample_) {
delete resample_;
resample_ = NULL;
}
delete[] data_;
}
srs_error_t SrsAudioRecode::initialize()
{
srs_error_t err = srs_success;
dec_ = new SrsAudioDecoder("aac");
if (!dec_) {
return srs_error_wrap(err, "SrsAudioDecoder failed");
}
dec_->initialize();
enc_ = new SrsAudioEncoder(dst_samplerate_, dst_channels_, 1, 1);
if (!enc_) {
return srs_error_wrap(err, "SrsAudioEncoder failed");
}
enc_->initialize();
resample_ = NULL;
return err;
}
// TODO: FIXME: Rename to transcode.
srs_error_t SrsAudioRecode::recode(SrsSample *pkt, char **buf, int *buf_len, int &n)
{
srs_error_t err = srs_success;
static char decode_buffer[kPacketBufMax];
static char resample_buffer[kFrameBufMax];
static char encode_buffer[kPacketBufMax];
if (!dec_) {
return srs_error_wrap(err, "dec_ nullptr");
}
int decode_len = kPacketBufMax;
if ((err = dec_->decode(pkt, decode_buffer, decode_len)) != srs_success) {
return srs_error_wrap(err, "decode error");
}
if (!resample_) {
int channel_layout = av_get_default_channel_layout(dst_channels_);
AVCodecContext *codec_ctx = dec_->codec_ctx();
resample_ = new SrsAudioResample(codec_ctx->sample_rate, (int)codec_ctx->channel_layout, \
codec_ctx->sample_fmt, codec_ctx->frame_size, dst_samplerate_, channel_layout, \
AV_SAMPLE_FMT_S16);
if (!resample_) {
return srs_error_wrap(err, "SrsAudioResample failed");
}
resample_->initialize();
}
SrsSample pcm;
pcm.bytes = decode_buffer;
pcm.size = decode_len;
int resample_len = kFrameBufMax;
if ((err = resample_->resample(&pcm, resample_buffer, resample_len)) != srs_success) {
return srs_error_wrap(err, "decode error");
}
n = 0;
int data_left = resample_len;
int total;
total = (dst_samplerate_ * kOpusPacketMs / 1000) * 2 * dst_channels_;
if (size_ + data_left < total) {
memcpy(data_ + size_, resample_buffer, data_left);
size_ += data_left;
} else {
int index = 0;
while (1) {
data_left = data_left - (total - size_);
memcpy(data_ + size_, resample_buffer + index, total - size_);
index += total - size_;
size_ += total - size_;
if (!enc_) {
return srs_error_wrap(err, "enc_ nullptr");
}
int encode_len;
pcm.bytes = (char *)data_;
pcm.size = size_;
if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) {
return srs_error_wrap(err, "decode error");
}
memcpy(buf[n], encode_buffer, encode_len);
buf_len[n] = encode_len;
n++;
size_ = 0;
if(!data_left)
break;
if(data_left < total) {
memcpy(data_ + size_, resample_buffer + index, data_left);
size_ += data_left;
break;
}
}
}
return err;
}

View file

@ -0,0 +1,126 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_AUDIO_RECODE_HPP
#define SRS_APP_AUDIO_RECODE_HPP
#include <string>
#include <srs_core.hpp>
#ifdef __cplusplus
extern "C" {
#endif
#include <libavutil/frame.h>
#include <libavutil/mem.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/channel_layout.h>
#include <libavutil/samplefmt.h>
#include <libswresample/swresample.h>
#include <opus/opus.h>
#ifdef __cplusplus
}
#endif
class SrsSample;
class SrsAudioDecoder
{
private:
AVFrame* frame_;
AVPacket* packet_;
AVCodecContext* codec_ctx_;
std::string codec_name_;
public:
SrsAudioDecoder(std::string codec);
virtual ~SrsAudioDecoder();
srs_error_t initialize();
virtual srs_error_t decode(SrsSample *pkt, char *buf, int &size);
AVCodecContext* codec_ctx();
};
class SrsAudioEncoder
{
private:
int inband_fec_;
int channels_;
int sampling_rate_;
int complexity_;
OpusEncoder *opus_;
public:
SrsAudioEncoder(int samplerate, int channels, int fec, int complexity);
virtual ~SrsAudioEncoder();
srs_error_t initialize();
virtual srs_error_t encode(SrsSample *frame, char *buf, int &size);
};
class SrsAudioResample
{
private:
int src_rate_;
int src_ch_layout_;
int src_nb_channels_;
enum AVSampleFormat src_sample_fmt_;
int src_linesize_;
int src_nb_samples_;
uint8_t **src_data_;
int dst_rate_;
int dst_ch_layout_;
int dst_nb_channels_;
enum AVSampleFormat dst_sample_fmt_;
int dst_linesize_;
int dst_nb_samples_;
uint8_t **dst_data_;
int max_dst_nb_samples_;
struct SwrContext *swr_ctx_;
public:
SrsAudioResample(int src_rate, int src_layout, enum AVSampleFormat src_fmt,
int src_nb, int dst_rate, int dst_layout, enum AVSampleFormat dst_fmt);
virtual ~SrsAudioResample();
srs_error_t initialize();
virtual srs_error_t resample(SrsSample *pcm, char *buf, int &size);
};
class SrsAudioRecode
{
private:
SrsAudioDecoder *dec_;
SrsAudioEncoder *enc_;
SrsAudioResample *resample_;
int dst_channels_;
int dst_samplerate_;
int size_;
char *data_;
public:
SrsAudioRecode(int channels, int samplerate);
virtual ~SrsAudioRecode();
srs_error_t initialize();
virtual srs_error_t recode(SrsSample *pkt, char **buf, int *buf_len, int &n);
};
#endif /* SRS_APP_AUDIO_RECODE_HPP */

View file

@ -3486,7 +3486,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
&& n != "max_connections" && n != "daemon" && n != "heartbeat"
&& n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
&& n != "http_server" && n != "stream_caster" && n != "srt_server"
&& n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server"
&& n != "utc_time" && n != "work_dir" && n != "asprocess"
&& n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit"
&& n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker"
@ -3673,7 +3673,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "play" && n != "publish" && n != "cluster"
&& n != "security" && n != "http_remux" && n != "dash"
&& n != "http_static" && n != "hds" && n != "exec"
&& n != "in_ack_size" && n != "out_ack_size") {
&& n != "in_ack_size" && n != "out_ack_size" && n != "rtc") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str());
}
// for each sub directives of vhost.
@ -3819,6 +3819,13 @@ srs_error_t SrsConfig::check_normal_config()
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.bandcheck.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
} else if (n == "rtc") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "bframe" && m != "aac") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
}
}
}
@ -4266,6 +4273,132 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf)
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_rtc_server_enabled()
{
SrsConfDirective* conf = root->get("rtc_server");
return get_rtc_server_enabled(conf);
}
bool SrsConfig::get_rtc_server_enabled(SrsConfDirective* conf)
{
static bool DEFAULT = false;
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
int SrsConfig::get_rtc_server_listen()
{
static int DEFAULT = 8000;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("listen");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
std::string SrsConfig::get_rtc_server_candidates()
{
static string DEFAULT = "*";
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("candidate");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
string eip = srs_getenv(conf->arg0());
if (!eip.empty()) {
return eip;
}
// If configed as ENV, but no ENV set, use default value.
if (srs_string_starts_with(conf->arg0(), "$")) {
return DEFAULT;
}
return (conf->arg0().c_str());
}
SrsConfDirective* SrsConfig::get_rtc(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
return conf? conf->get("rtc") : NULL;
}
bool SrsConfig::get_rtc_enabled(string vhost)
{
static bool DEFAULT = false;
SrsConfDirective* conf = get_rtc(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_rtc_bframe_discard(string vhost)
{
static bool DEFAULT = false;
SrsConfDirective* conf = get_rtc(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("bframe");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return conf->arg0() == "discard";
}
bool SrsConfig::get_rtc_aac_discard(string vhost)
{
static bool DEFAULT = false;
SrsConfDirective* conf = get_rtc(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("aac");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return conf->arg0() == "discard";
}
SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
{
srs_assert(root);

View file

@ -498,6 +498,19 @@ public:
virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf);
// Get the max udp port for rtp of stream caster rtsp.
virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf);
// rtc section
public:
virtual int get_rtc_server_enabled();
virtual bool get_rtc_server_enabled(SrsConfDirective* conf);
virtual int get_rtc_server_listen();
virtual std::string get_rtc_server_candidates();
SrsConfDirective* get_rtc(std::string vhost);
bool get_rtc_enabled(std::string vhost);
bool get_rtc_bframe_discard(std::string vhost);
bool get_rtc_aac_discard(std::string vhost);
// vhost specified section
public:
// Get the vhost directive by vhost name.

View file

@ -178,6 +178,13 @@ srs_error_t SrsConnection::cycle()
srs_trace("client finished.");
return err;
}
// It maybe success with message.
if (srs_error_code(err) == ERROR_SUCCESS) {
srs_trace("client finished%s.", srs_error_summary(err).c_str());
srs_freep(err);
return err;
}
// client close peer.
// TODO: FIXME: Only reset the error when client closed it.

View file

@ -0,0 +1,138 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_dtls.hpp>
using namespace std;
#include <string.h>
#include <srs_kernel_log.hpp>
#include <srtp2/srtp.h>
SrsDtls* SrsDtls::_instance = NULL;
SrsDtls::SrsDtls()
{
}
SrsDtls::~SrsDtls()
{
}
SrsDtls* SrsDtls::instance()
{
if (!_instance) {
_instance = new SrsDtls();
_instance->init();
}
return _instance;
}
void SrsDtls::init()
{
// srtp init first
srs_assert(srtp_init() == 0);
// init dtls context
EVP_PKEY* dtls_private_key = EVP_PKEY_new();
srs_assert(dtls_private_key);
RSA* rsa = RSA_new();
srs_assert(rsa);
BIGNUM* exponent = BN_new();
srs_assert(exponent);
BN_set_word(exponent, RSA_F4);
const std::string& aor = "www.hw.com";
int expire_day = 365;
int private_key_len = 1024;
RSA_generate_key_ex(rsa, private_key_len, exponent, NULL);
srs_assert(EVP_PKEY_set1_RSA(dtls_private_key, rsa) == 1);
X509* dtls_cert = X509_new();
srs_assert(dtls_cert);
X509_NAME* subject = X509_NAME_new();
srs_assert(subject);
int serial = rand();
ASN1_INTEGER_set(X509_get_serialNumber(dtls_cert), serial);
X509_NAME_add_entry_by_txt(subject, "CN", MBSTRING_ASC, (unsigned char *) aor.data(), aor.size(), -1, 0);
X509_set_issuer_name(dtls_cert, subject);
X509_set_subject_name(dtls_cert, subject);
const long cert_duration = 60*60*24*expire_day;
X509_gmtime_adj(X509_get_notBefore(dtls_cert), 0);
X509_gmtime_adj(X509_get_notAfter(dtls_cert), cert_duration);
srs_assert(X509_set_pubkey(dtls_cert, dtls_private_key) == 1);
srs_assert(X509_sign(dtls_cert, dtls_private_key, EVP_sha1()) != 0);
// cleanup
RSA_free(rsa);
BN_free(exponent);
X509_NAME_free(subject);
dtls_ctx = SSL_CTX_new(DTLSv1_2_method());
srs_assert(SSL_CTX_use_certificate(dtls_ctx, dtls_cert) == 1);
srs_assert(SSL_CTX_use_PrivateKey(dtls_ctx, dtls_private_key) == 1);
srs_assert(SSL_CTX_set_cipher_list(dtls_ctx, "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH") == 1);
srs_assert(SSL_CTX_set_tlsext_use_srtp(dtls_ctx, "SRTP_AES128_CM_SHA1_80") == 0);
SSL_CTX_set_verify_depth (dtls_ctx, 4);
SSL_CTX_set_read_ahead(dtls_ctx, 1);
// dtls fingerprint
char fp[100] = {0};
char *p = fp;
unsigned char md[EVP_MAX_MD_SIZE];
unsigned int n = 0;
// TODO: FIXME: Unused variable.
/*int r = */X509_digest(dtls_cert, EVP_sha256(), md, &n);
for (unsigned int i = 0; i < n; i++, ++p) {
sprintf(p, "%02X", md[i]);
p += 2;
if(i < (n-1)) {
*p = ':';
} else {
*p = '\0';
}
}
fingerprint.assign(fp, strlen(fp));
srs_trace("fingerprint=%s", fingerprint.c_str());
}

View file

@ -0,0 +1,52 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_DTLS_HPP
#define SRS_APP_DTLS_HPP
#include <srs_core.hpp>
#include <string>
#include <openssl/ssl.h>
class SrsDtls
{
private:
static SrsDtls* _instance;
private:
std::string fingerprint;
SSL_CTX* dtls_ctx;
private:
SrsDtls();
virtual ~SrsDtls();
void init();
public:
static SrsDtls* instance();
SSL_CTX* get_dtls_ctx() { return dtls_ctx; }
public:
std::string get_fingerprint() const { return fingerprint; }
};
#endif

View file

@ -579,7 +579,6 @@ srs_error_t SrsEdgeForwarder::do_cycle()
SrsCommonMessage* msg = NULL;
err = sdk->recv_message(&msg);
srs_verbose("edge loop recv message. ret=%d", ret);
if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) {
srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str());
send_error_code = srs_error_code(err);

View file

@ -26,7 +26,6 @@
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_app_st.hpp>
#include <srs_kernel_log.hpp>
ISrsHourGlass::ISrsHourGlass()
@ -42,13 +41,31 @@ SrsHourGlass::SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution)
handler = h;
_resolution = resolution;
total_elapse = 0;
trd = new SrsSTCoroutine("timer", this, _srs_context->get_id());
}
SrsHourGlass::~SrsHourGlass()
{
srs_freep(trd);
}
srs_error_t SrsHourGlass::tick(int type, srs_utime_t interval)
srs_error_t SrsHourGlass::start()
{
srs_error_t err = srs_success;
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
return err;
}
srs_error_t SrsHourGlass::tick(srs_utime_t interval)
{
return tick(0, interval);
}
srs_error_t SrsHourGlass::tick(int event, srs_utime_t interval)
{
srs_error_t err = srs_success;
@ -57,7 +74,7 @@ srs_error_t SrsHourGlass::tick(int type, srs_utime_t interval)
"invalid interval=%dms, resolution=%dms", srsu2msi(interval), srsu2msi(_resolution));
}
ticks[type] = interval;
ticks[event] = interval;
return err;
}
@ -65,22 +82,28 @@ srs_error_t SrsHourGlass::tick(int type, srs_utime_t interval)
srs_error_t SrsHourGlass::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "quit");
}
map<int, srs_utime_t>::iterator it;
for (it = ticks.begin(); it != ticks.end(); ++it) {
int type = it->first;
srs_utime_t interval = it->second;
if (interval == 0 || (total_elapse % interval) == 0) {
if ((err = handler->notify(type, interval, total_elapse)) != srs_success) {
return srs_error_wrap(err, "notify");
map<int, srs_utime_t>::iterator it;
for (it = ticks.begin(); it != ticks.end(); ++it) {
int event = it->first;
srs_utime_t interval = it->second;
if (interval == 0 || (total_elapse % interval) == 0) {
if ((err = handler->notify(event, interval, total_elapse)) != srs_success) {
return srs_error_wrap(err, "notify");
}
}
}
}
// TODO: FIXME: Maybe we should use wallclock.
total_elapse += _resolution;
srs_usleep(_resolution);
// TODO: FIXME: Maybe we should use wallclock.
total_elapse += _resolution;
srs_usleep(_resolution);
}
return err;
}

View file

@ -26,8 +26,12 @@
#include <srs_core.hpp>
#include <srs_app_st.hpp>
#include <map>
class SrsCoroutine;
// The handler for the tick.
class ISrsHourGlass
{
@ -36,40 +40,41 @@ public:
virtual ~ISrsHourGlass();
public:
// When time is ticked, this function is called.
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick) = 0;
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick) = 0;
};
// he hourglass used to do some specieal task,
// while these task is cycle when some interval, for example,
// there are N=3 tasks to do:
// 1. heartbeat every 3s.
// 2. print message every 5s.
// 3. notify backend every 7s.
// The hourglass(timer or SrsTimer) for special tasks,
// while these tasks are attached to some intervals, for example,
// there are N=3 tasks bellow:
// 1. A heartbeat every 3s.
// 2. A print message every 5s.
// 3. A notify backend every 7s.
// The hourglass will call back when ticks:
// 1. notify(type=1, time=3)
// 2. notify(type=2, time=5)
// 3. notify(type=1, time=6)
// 4. notify(type=3, time=7)
// 5. notify(type=1, time=9)
// 6. notify(type=2, time=10)
// This is used for server and bocar server and other manager.
// 1. Got notify(event=1, time=3)
// 2. Got notify(event=2, time=5)
// 3. Got notify(event=1, time=6)
// 4. Got notify(event=3, time=7)
// 5. Got notify(event=1, time=9)
// 6. Got notify(event=2, time=10)
// It's a complex and high-performance timer.
//
// Usage:
// SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS);
//
// hg->tick(1, 3 * SRS_UTIME_MILLISECONDS);
// hg->tick(2, 5 * SRS_UTIME_MILLISECONDS);
// hg->tick(3, 7 * SRS_UTIME_MILLISECONDS);
// // create a thread to cycle, which will call handerl when ticked.
// while (true) {
// hg->cycle();
// }
class SrsHourGlass
//
// // The hg will create a thread for timer.
// hg->start();
class SrsHourGlass : virtual public ISrsCoroutineHandler
{
private:
SrsCoroutine* trd;
ISrsHourGlass* handler;
srs_utime_t _resolution;
// The ticks:
// key: the type of tick.
// key: the event of tick.
// value: the interval of tick.
std::map<int, srs_utime_t> ticks;
// The total elapsed time,
@ -79,10 +84,14 @@ public:
SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution);
virtual ~SrsHourGlass();
public:
// Add a pair of tick(type, interval).
// @param type the type of tick.
// Start the hourglass.
virtual srs_error_t start();
public:
// Add a pair of tick(event, interval).
// @param event the event of tick, default is 0.
// @param interval the interval in srs_utime_t of tick.
virtual srs_error_t tick(int type, srs_utime_t interval);
virtual srs_error_t tick(srs_utime_t interval);
virtual srs_error_t tick(int event, srs_utime_t interval);
public:
// Cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked.

View file

@ -46,6 +46,9 @@ using namespace std;
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_coworkers.hpp>
#ifdef SRS_AUTO_RTC
#include <srs_app_rtc_conn.hpp>
#endif
srs_error_t srs_api_response_jsonp(ISrsHttpResponseWriter* w, string callback, string data)
{
@ -780,6 +783,135 @@ srs_error_t SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return srs_api_response(w, r, obj->dumps());
}
#ifdef SRS_AUTO_RTC
SrsGoApiSdp::SrsGoApiSdp(SrsRtcServer* rtc_svr)
{
rtc_server = rtc_svr;
}
SrsGoApiSdp::~SrsGoApiSdp()
{
}
// Request:
// POST /rtc/v1/play/
// {
// "sdp":"offer...", "streamurl":"webrtc://r.ossrs.net/live/livestream",
// "api":'http...", "clientip":"..."
// }
// Response:
// {"sdp":"answer...", "sid":"..."}
// @see https://github.com/rtcdn/rtcdn-draft
srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
SrsJsonObject* res = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, res);
if ((err = do_serve_http(w, r, res)) != srs_success) {
srs_warn("RTC error %s", srs_error_desc(err).c_str()); srs_freep(err);
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}
return srs_api_response(w, r, res->dumps());
}
srs_error_t SrsGoApiSdp::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)
{
srs_error_t err = srs_success;
// For each RTC session, we use short-term HTTP connection.
SrsHttpHeader* hdr = w->header();
hdr->set("Connection", "Close");
// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
return srs_error_wrap(err, "read body");
}
SrsJsonAny* json = SrsJsonAny::loads(req_json);
if (!json || !json->is_object()) {
return srs_error_wrap(err, "not json");
}
req = json->to_object();
}
// Fetch params from req object.
SrsJsonAny* prop = NULL;
if ((prop = req->ensure_property_string("sdp")) == NULL) {
return srs_error_wrap(err, "not sdp");
}
string remote_sdp_str = prop->to_str();
if ((prop = req->ensure_property_string("streamurl")) == NULL) {
return srs_error_wrap(err, "not streamurl");
}
string streamurl = prop->to_str();
string clientip;
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
string api;
if ((prop = req->ensure_property_string("api")) != NULL) {
api = prop->to_str();
}
// Parse app and stream from streamurl.
string app;
string stream_name;
if (true) {
string tcUrl;
srs_parse_rtmp_url(streamurl, tcUrl, stream_name);
int port;
string schema, host, vhost, param;
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
}
srs_trace("RTC play %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB",
streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length());
// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
SrsSdp remote_sdp;
if ((err = remote_sdp.decode(remote_sdp_str)) != srs_success) {
return srs_error_wrap(err, "decode sdp");
}
SrsRequest request;
request.app = app;
request.stream = stream_name;
SrsSdp local_sdp;
// TODO: FIXME: Maybe need a better name?
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp);
string local_sdp_str = "";
if ((err = local_sdp.encode(local_sdp_str)) != srs_success) {
return srs_error_wrap(err, "encode sdp");
}
res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::integer(SrsStatistic::instance()->server_id()));
// TODO: add candidates in response json?
res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str()));
res->set("sessionid", SrsJsonAny::str(rtc_session->id().c_str()));
srs_trace("RTC sid=%s, answer=%dB", rtc_session->id().c_str(), local_sdp_str.length());
return err;
}
#endif
SrsGoApiClients::SrsGoApiClients()
{
}
@ -1351,6 +1483,11 @@ srs_error_t SrsHttpApi::do_cycle()
// get a http message
if ((err = parser->parse_message(skt, &req)) != srs_success) {
// For HTTP timeout, we think it's ok.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
srs_freep(err);
return srs_error_wrap(srs_success, "http api timeout");
}
return srs_error_wrap(err, "parse message");
}

View file

@ -31,6 +31,8 @@ class ISrsHttpMessage;
class SrsHttpParser;
class SrsHttpHandler;
class SrsServer;
class SrsRtcServer;
class SrsJsonObject;
#include <srs_app_st.hpp>
#include <srs_app_conn.hpp>
@ -164,6 +166,21 @@ public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};
#ifdef SRS_AUTO_RTC
class SrsGoApiSdp : public ISrsHttpHandler
{
private:
SrsRtcServer* rtc_server;
public:
SrsGoApiSdp(SrsRtcServer* rtc_svr);
virtual ~SrsGoApiSdp();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
};
#endif
class SrsGoApiClients : public ISrsHttpHandler
{
public:

View file

@ -29,6 +29,7 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <sstream>
using namespace std;
@ -1132,8 +1133,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
// trigger edge to fetch from origin.
bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost);
srs_trace("flv: source url=%s, is_edge=%d, source_id=%d[%d]",
r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id());
srs_trace("flv: source url=%s, is_edge=%d, source_id=[%d][%d]",
r->get_stream_url().c_str(), vhost_is_edge, ::getpid(), s->source_id());
return err;
}

View file

@ -102,6 +102,11 @@ void SrsServerAdapter::stop()
{
}
SrsServer* SrsServerAdapter::instance()
{
return srs;
}
SrsHybridServer::SrsHybridServer()
{
}
@ -181,5 +186,15 @@ void SrsHybridServer::stop()
}
}
SrsServerAdapter* SrsHybridServer::srs()
{
for (vector<ISrsHybridServer*>::iterator it = servers.begin(); it != servers.end(); ++it) {
if (dynamic_cast<SrsServerAdapter*>(*it)) {
return dynamic_cast<SrsServerAdapter*>(*it);
}
}
return NULL;
}
SrsHybridServer* _srs_hybrid = new SrsHybridServer();

View file

@ -57,6 +57,8 @@ public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
public:
virtual SrsServer* instance();
};
// The hybrid server manager.
@ -73,6 +75,8 @@ public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
public:
virtual SrsServerAdapter* srs();
};
extern SrsHybridServer* _srs_hybrid;

View file

@ -26,6 +26,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
@ -59,6 +60,19 @@ srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
return srs_success;
}
ISrsUdpMuxHandler::ISrsUdpMuxHandler()
{
}
ISrsUdpMuxHandler::~ISrsUdpMuxHandler()
{
}
srs_error_t ISrsUdpMuxHandler::on_stfd_change(srs_netfd_t /*fd*/)
{
return srs_success;
}
ISrsTcpHandler::ISrsTcpHandler()
{
}
@ -207,3 +221,193 @@ srs_error_t SrsTcpListener::cycle()
return err;
}
SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd)
{
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
nread = 0;
lfd = fd;
fromlen = 0;
}
SrsUdpMuxSocket::~SrsUdpMuxSocket()
{
srs_freepa(buf);
}
SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
{
SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(lfd);
// Don't copy buffer
srs_freepa(sendonly->buf);
sendonly->nb_buf = 0;
sendonly->nread = 0;
sendonly->lfd = lfd;
sendonly->from = from;
sendonly->fromlen = fromlen;
sendonly->peer_ip = peer_ip;
sendonly->peer_port = peer_port;
return sendonly;
}
int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
{
fromlen = sizeof(from);
nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout);
if (nread > 0) {
char address_string[64];
char port_string[16];
if (getnameinfo((sockaddr*)&from, fromlen,
(char*)&address_string, sizeof(address_string),
(char*)&port_string, sizeof(port_string),
NI_NUMERICHOST|NI_NUMERICSERV)) {
return -1;
}
peer_ip = std::string(address_string);
peer_port = atoi(port_string);
}
return nread;
}
srs_error_t SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
{
srs_error_t err = srs_success;
int nb_write = srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout);
if (nb_write <= 0) {
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "sendto timeout %d ms", srsu2msi(timeout));
}
return srs_error_new(ERROR_SOCKET_WRITE, "sendto");
}
return err;
}
std::string SrsUdpMuxSocket::get_peer_id()
{
char id_buf[1024];
int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port);
return string(id_buf, len);
}
SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p)
{
handler = h;
ip = i;
port = p;
lfd = NULL;
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
trd = new SrsDummyCoroutine();
}
SrsUdpMuxListener::~SrsUdpMuxListener()
{
srs_freep(trd);
srs_close_stfd(lfd);
srs_freepa(buf);
}
int SrsUdpMuxListener::fd()
{
return srs_netfd_fileno(lfd);
}
srs_netfd_t SrsUdpMuxListener::stfd()
{
return lfd;
}
srs_error_t SrsUdpMuxListener::listen()
{
srs_error_t err = srs_success;
if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) {
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
set_socket_buffer();
srs_freep(trd);
trd = new SrsSTCoroutine("udp", this);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start thread");
}
return err;
}
void SrsUdpMuxListener::set_socket_buffer()
{
int sndbuf_size = 0;
socklen_t opt_len = sizeof(sndbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len);
srs_trace("default udp remux socket sndbuf=%d", sndbuf_size);
sndbuf_size = 1024*1024*10; // 10M
if (setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, sizeof(sndbuf_size)) < 0) {
srs_warn("set sock opt SO_SNDBUFFORCE failed");
}
opt_len = sizeof(sndbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len);
srs_trace("udp remux socket sndbuf=%d", sndbuf_size);
int rcvbuf_size = 0;
opt_len = sizeof(rcvbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len);
srs_trace("default udp remux socket rcvbuf=%d", rcvbuf_size);
rcvbuf_size = 1024*1024*10; // 10M
if (setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, sizeof(rcvbuf_size)) < 0) {
srs_warn("set sock opt SO_RCVBUFFORCE failed");
}
opt_len = sizeof(rcvbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len);
srs_trace("udp remux socket rcvbuf=%d", rcvbuf_size);
}
srs_error_t SrsUdpMuxListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "udp listener");
}
SrsUdpMuxSocket udp_mux_skt(lfd);
if (udp_mux_skt.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) {
srs_error("udp recv error");
// remux udp never return
continue;
}
if ((err = handler->on_udp_packet(&udp_mux_skt)) != srs_success) {
// remux udp never return
srs_error("udp packet handler error:%s", srs_error_desc(err).c_str());
continue;
}
if (SrsUdpPacketRecvCycleInterval > 0) {
srs_usleep(SrsUdpPacketRecvCycleInterval);
}
}
return err;
}

View file

@ -26,6 +26,8 @@
#include <srs_core.hpp>
#include <sys/socket.h>
#include <string>
#include <srs_app_st.hpp>
@ -33,6 +35,8 @@
struct sockaddr;
class SrsUdpMuxSocket;
// The udp packet handler.
class ISrsUdpHandler
{
@ -54,6 +58,17 @@ public:
virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0;
};
// TODO: FIXME: Add comments?
class ISrsUdpMuxHandler
{
public:
ISrsUdpMuxHandler();
virtual ~ISrsUdpMuxHandler();
public:
virtual srs_error_t on_stfd_change(srs_netfd_t fd);
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) = 0;
};
// The tcp connection handler.
class ISrsTcpHandler
{
@ -68,13 +83,13 @@ public:
// Bind udp port, start thread to recv packet and handler it.
class SrsUdpListener : public ISrsCoroutineHandler
{
private:
protected:
srs_netfd_t lfd;
SrsCoroutine* trd;
private:
protected:
char* buf;
int nb_buf;
private:
protected:
ISrsUdpHandler* handler;
std::string ip;
int port;
@ -113,4 +128,62 @@ public:
virtual srs_error_t cycle();
};
class SrsUdpMuxSocket
{
private:
char* buf;
int nb_buf;
int nread;
srs_netfd_t lfd;
sockaddr_storage from;
int fromlen;
std::string peer_ip;
int peer_port;
public:
SrsUdpMuxSocket(srs_netfd_t fd);
virtual ~SrsUdpMuxSocket();
int recvfrom(srs_utime_t timeout);
srs_error_t sendto(void* data, int size, srs_utime_t timeout);
char* data() { return buf; }
int size() { return nread; }
std::string get_peer_ip() const { return peer_ip; }
int get_peer_port() const { return peer_port; }
std::string get_peer_id();
public:
SrsUdpMuxSocket* copy_sendonly();
private:
// Don't allow copy, user copy_sendonly instead
SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs);
SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs);
};
class SrsUdpMuxListener : public ISrsCoroutineHandler
{
protected:
srs_netfd_t lfd;
SrsCoroutine* trd;
protected:
char* buf;
int nb_buf;
protected:
ISrsUdpMuxHandler* handler;
std::string ip;
int port;
public:
SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p);
virtual ~SrsUdpMuxListener();
public:
virtual int fd();
virtual srs_netfd_t stfd();
public:
virtual srs_error_t listen();
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
void set_socket_buffer();
};
#endif

View file

@ -186,8 +186,8 @@ int SrsPithyPrint::enter_stage()
srs_assert(stage != NULL);
client_id = stage->nb_clients++;
srs_verbose("enter stage, stage_id=%d, client_id=%d, nb_clients=%d, time_ms=%d",
stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms);
srs_verbose("enter stage, stage_id=%d, client_id=%d, nb_clients=%d",
stage->stage_id, client_id, stage->nb_clients);
return client_id;
}
@ -199,8 +199,8 @@ void SrsPithyPrint::leave_stage()
stage->nb_clients--;
srs_verbose("leave stage, stage_id=%d, client_id=%d, nb_clients=%d, time_ms=%d",
stage->stage_id, client_id, stage->nb_clients, stage->pithy_print_time_ms);
srs_verbose("leave stage, stage_id=%d, client_id=%d, nb_clients=%d",
stage->stage_id, client_id, stage->nb_clients);
}
void SrsPithyPrint::elapse()

View file

@ -0,0 +1,578 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_rtc.hpp>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
#include <algorithm>
#include <sstream>
using namespace std;
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_kernel_rtp.hpp>
#include <srs_app_config.hpp>
#include <srs_app_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_protocol_format.hpp>
#include <srs_rtmp_stack.hpp>
#include <openssl/rand.h>
#include <srs_app_audio_recode.hpp>
// TODO: Add this function into SrsRtpMux class.
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, SrsBuffer** stream_ptr)
{
srs_error_t err = srs_success;
if (format->is_aac_sequence_header()) {
return err;
}
if (stream_ptr == NULL) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "adts");
}
srs_verbose("audio samples=%d", format->audio->nb_samples);
if (format->audio->nb_samples != 1) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "adts");
}
int nb_buf = format->audio->samples[0].size + 7;
char* buf = new char[nb_buf];
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
// TODO: Add comment.
stream->write_1bytes(0xFF);
stream->write_1bytes(0xF9);
stream->write_1bytes(((format->acodec->aac_object - 1) << 6) | ((format->acodec->aac_sample_rate & 0x0F) << 2) | ((format->acodec->aac_channels & 0x04) >> 2));
stream->write_1bytes(((format->acodec->aac_channels & 0x03) << 6) | ((nb_buf >> 11) & 0x03));
stream->write_1bytes((nb_buf >> 3) & 0xFF);
stream->write_1bytes(((nb_buf & 0x07) << 5) | 0x1F);
stream->write_1bytes(0xFC);
stream->write_bytes(format->audio->samples[0].bytes, format->audio->samples[0].size);
*stream_ptr = stream;
return err;
}
SrsRtpMuxer::SrsRtpMuxer()
{
sequence = 0;
discard_bframe = false;
}
SrsRtpMuxer::~SrsRtpMuxer()
{
}
srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsFormat* format)
{
srs_error_t err = srs_success;
if (format->is_avc_sequence_header()) {
sps.assign(format->vcodec->sequenceParameterSetNALUnit.data(), format->vcodec->sequenceParameterSetNALUnit.size());
pps.assign(format->vcodec->pictureParameterSetNALUnit.data(), format->vcodec->pictureParameterSetNALUnit.size());
// only collect SPS/PPS.
return err;
}
vector<SrsRtpSharedPacket*> rtp_packet_vec;
for (int i = 0; i < format->video->nb_samples; ++i) {
SrsSample sample = format->video->samples[i];
uint8_t header = sample.bytes[0];
uint8_t nal_type = header & kNalTypeMask;
// TODO: Use config to determine should check avc stream.
if (nal_type == SrsAvcNaluTypeNonIDR || nal_type == SrsAvcNaluTypeDataPartitionA || nal_type == SrsAvcNaluTypeIDR) {
SrsBuffer* stream = new SrsBuffer(sample.bytes, sample.size);
SrsAutoFree(SrsBuffer, stream);
// Skip nalu header.
stream->skip(1);
SrsBitBuffer bitstream(stream);
int32_t first_mb_in_slice = 0;
if ((err = srs_avc_nalu_read_uev(&bitstream, first_mb_in_slice)) != srs_success) {
return srs_error_wrap(err, "nalu read uev");
}
int32_t slice_type = 0;
if ((err = srs_avc_nalu_read_uev(&bitstream, slice_type)) != srs_success) {
return srs_error_wrap(err, "nalu read uev");
}
srs_verbose("nal_type=%d, slice type=%d", nal_type, slice_type);
if (slice_type == SrsAvcSliceTypeB || slice_type == SrsAvcSliceTypeB1) {
if (discard_bframe) {
continue;
}
}
}
if (sample.size <= max_payload_size) {
if ((err = packet_single_nalu(shared_frame, format, &sample, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet single nalu");
}
} else {
if ((err = packet_fu_a(shared_frame, format, &sample, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet fu-a");
}
}
}
if (! rtp_packet_vec.empty()) {
// At the end of the frame, set marker bit.
// One frame may have multi nals. Set the marker bit in the last nal end, no the end of the nal.
if ((err = rtp_packet_vec.back()->set_marker(true)) != srs_success) {
return srs_error_wrap(err, "set marker");
}
}
shared_frame->set_rtp_packets(rtp_packet_vec);
return err;
}
srs_error_t SrsRtpMuxer::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
char* p = sample->bytes + 1;
int nb_left = sample->size - 1;
uint8_t header = sample->bytes[0];
uint8_t nal_type = header & kNalTypeMask;
if (nal_type == SrsAvcNaluTypeIDR) {
if ((err = packet_stap_a(sps, pps, shared_frame, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
}
int num_of_packet = (sample->size - 1 + max_payload_size) / max_payload_size;
for (int i = 0; i < num_of_packet; ++i) {
char* buf = new char[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
int packet_size = min(nb_left, max_payload_size);
// v=2,p=0,x=0,cc=0
stream->write_1bytes(0x80);
// marker payloadtype
stream->write_1bytes(kH264PayloadType);
// sequence
stream->write_2bytes(sequence);
// timestamp
stream->write_4bytes(int32_t(shared_frame->timestamp * 90));
// ssrc
stream->write_4bytes(int32_t(kVideoSSRC));
// fu-indicate
uint8_t fu_indicate = kFuA;
fu_indicate |= (header & (~kNalTypeMask));
stream->write_1bytes(fu_indicate);
uint8_t fu_header = nal_type;
if (i == 0)
fu_header |= kStart;
if (i == num_of_packet - 1)
fu_header |= kEnd;
stream->write_1bytes(fu_header);
stream->write_bytes(p, packet_size);
p += packet_size;
nb_left -= packet_size;
srs_verbose("rtp fu-a nalu, size=%u, seq=%u, timestamp=%lu, ssrc=%u, payloadtype=%u",
sample->size, sequence, (shared_frame->timestamp * 90), kVideoSSRC, kH264PayloadType);
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos());
rtp_packet_vec.push_back(rtp_shared_pkt);
}
return err;
}
srs_error_t SrsRtpMuxer::packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
uint8_t header = sample->bytes[0];
uint8_t nal_type = header & kNalTypeMask;
char* buf = new char[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
if (nal_type == SrsAvcNaluTypeIDR) {
if ((err = packet_stap_a(sps, pps, shared_frame, rtp_packet_vec)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
}
// v=2,p=0,x=0,cc=0
stream->write_1bytes(0x80);
// marker payloadtype
stream->write_1bytes(kH264PayloadType);
// sequenct
stream->write_2bytes(sequence);
// timestamp
stream->write_4bytes(int32_t(shared_frame->timestamp * 90));
// ssrc
stream->write_4bytes(int32_t(kVideoSSRC));
stream->write_bytes(sample->bytes, sample->size);
srs_verbose("rtp single nalu, size=%u, seq=%u, timestamp=%lu, ssrc=%u, payloadtype=%u",
sample->size, sequence, (shared_frame->timestamp * 90), kVideoSSRC, kH264PayloadType);
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos());
rtp_packet_vec.push_back(rtp_shared_pkt);
return err;
}
srs_error_t SrsRtpMuxer::packet_stap_a(const string &sps, const string& pps, SrsSharedPtrMessage* shared_frame, vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
if (sps.empty() || pps.empty()) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
}
uint8_t header = sps[0];
uint8_t nal_type = header & kNalTypeMask;
char* buf = new char[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
// v=2,p=0,x=0,cc=0
stream->write_1bytes(0x80);
// marker payloadtype
stream->write_1bytes(kH264PayloadType);
// sequenct
stream->write_2bytes(sequence);
// timestamp
stream->write_4bytes(int32_t(shared_frame->timestamp * 90));
// ssrc
stream->write_4bytes(int32_t(kVideoSSRC));
// stap-a header
uint8_t stap_a_header = kStapA;
stap_a_header |= (nal_type & (~kNalTypeMask));
stream->write_1bytes(stap_a_header);
stream->write_2bytes(sps.size());
stream->write_bytes((char*)sps.data(), sps.size());
stream->write_2bytes(pps.size());
stream->write_bytes((char*)pps.data(), pps.size());
srs_verbose("rtp stap-a nalu, size=%u, seq=%u, timestamp=%lu, ssrc=%u, payloadtype=%u",
(sps.size() + pps.size()), sequence, (shared_frame->timestamp * 90), kVideoSSRC, kH264PayloadType);
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos());
rtp_packet_vec.push_back(rtp_shared_pkt);
return err;
}
SrsRtpOpusMuxer::SrsRtpOpusMuxer()
{
sequence = 0;
timestamp = 0;
recoder = NULL;
}
SrsRtpOpusMuxer::~SrsRtpOpusMuxer()
{
if (recoder) {
delete recoder;
recoder = NULL;
}
}
srs_error_t SrsRtpOpusMuxer::initialize()
{
srs_error_t err = srs_success;
recoder = new SrsAudioRecode(kChannel, kSamplerate);
if (!recoder) {
return srs_error_wrap(err, "SrsAacOpus init failed");
}
recoder->initialize();
return err;
}
srs_error_t SrsRtpOpusMuxer::frame_to_packet(SrsSharedPtrMessage* shared_audio, SrsFormat* format, SrsBuffer* stream)
{
srs_error_t err = srs_success;
vector<SrsRtpSharedPacket*> rtp_packet_vec;
char* data_ptr[kArrayLength];
static char data_array[kArrayLength][kArrayBuffer];
int elen[kArrayLength], number = 0;
data_ptr[0] = &data_array[0][0];
for (int i = 1; i < kArrayLength; i++) {
data_ptr[i] = data_array[i];
}
SrsSample pkt;
pkt.bytes = stream->data();
pkt.size = stream->pos();
if ((err = recoder->recode(&pkt, data_ptr, elen, number)) != srs_success) {
return srs_error_wrap(err, "recode error");
}
for (int i = 0; i < number; i++) {
SrsSample sample;
sample.size = elen[i];
sample.bytes = data_ptr[i];
packet_opus(shared_audio, &sample, rtp_packet_vec);
}
shared_audio->set_rtp_packets(rtp_packet_vec);
return err;
}
srs_error_t SrsRtpOpusMuxer::packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec)
{
srs_error_t err = srs_success;
char* buf = new char[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
// v=2,p=0,x=0,cc=0
stream->write_1bytes(0x80);
// marker payloadtype
stream->write_1bytes(kOpusPayloadType);
// sequenct
stream->write_2bytes(sequence);
// timestamp
stream->write_4bytes(int32_t(timestamp));
// TODO: FIXME: Why 960? Need Refactoring?
timestamp += 960;
// ssrc
stream->write_4bytes(int32_t(kAudioSSRC));
stream->write_bytes(sample->bytes, sample->size);
srs_verbose("sample=%s", srs_string_dumps_hex(sample->bytes, sample->size).c_str());
srs_verbose("opus, size=%u, seq=%u, timestamp=%lu, ssrc=%u, payloadtype=%u",
sample->size, sequence, timestamp, kAudioSSRC, kOpusPayloadType);
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
rtp_shared_pkt->create(timestamp, sequence++, kAudioSSRC, kOpusPayloadType, stream->data(), stream->pos());
rtp_shared_pkt->set_marker(true);
rtp_packet_vec.push_back(rtp_shared_pkt);
return err;
}
SrsRtc::SrsRtc()
{
req = NULL;
hub = NULL;
enabled = false;
disposable = false;
last_update_time = 0;
discard_aac = false;
}
SrsRtc::~SrsRtc()
{
srs_freep(rtp_h264_muxer);
}
void SrsRtc::dispose()
{
if (enabled) {
on_unpublish();
}
}
// TODO: FIXME: Dead code?
srs_error_t SrsRtc::cycle()
{
srs_error_t err = srs_success;
return err;
}
srs_error_t SrsRtc::initialize(SrsOriginHub* h, SrsRequest* r)
{
srs_error_t err = srs_success;
hub = h;
req = r;
rtp_h264_muxer = new SrsRtpMuxer();
rtp_h264_muxer->discard_bframe = _srs_config->get_rtc_bframe_discard(req->vhost);
// TODO: FIXME: Support reload and log it.
discard_aac = _srs_config->get_rtc_aac_discard(req->vhost);
rtp_opus_muxer = new SrsRtpOpusMuxer();
if (rtp_opus_muxer) {
rtp_opus_muxer->initialize();
}
return err;
}
srs_error_t SrsRtc::on_publish()
{
srs_error_t err = srs_success;
// update the hls time, for hls_dispose.
last_update_time = srs_get_system_time();
// support multiple publish.
if (enabled) {
return err;
}
if (!_srs_config->get_rtc_enabled(req->vhost)) {
return err;
}
// if enabled, open the muxer.
enabled = true;
// ok, the hls can be dispose, or need to be dispose.
disposable = true;
return err;
}
void SrsRtc::on_unpublish()
{
// support multiple unpublish.
if (!enabled) {
return;
}
enabled = false;
}
srs_error_t SrsRtc::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
// Ignore if no format->acodec, it means the codec is not parsed, or unknown codec.
// @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474
if (!format->acodec) {
return err;
}
// update the hls time, for hls_dispose.
last_update_time = srs_get_system_time();
// ts support audio codec: aac/mp3
SrsAudioCodecId acodec = format->acodec->id;
if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) {
return err;
}
// When drop aac audio packet, never transcode.
if (discard_aac && acodec == SrsAudioCodecIdAAC) {
return err;
}
// ignore sequence header
srs_assert(format->audio);
SrsBuffer* stream = NULL;
SrsAutoFree(SrsBuffer, stream);
if ((err = aac_raw_append_adts_header(shared_audio, format, &stream)) != srs_success) {
return srs_error_wrap(err, "aac append header");
}
if (stream) {
rtp_opus_muxer->frame_to_packet(shared_audio, format, stream);
}
return err;
}
srs_error_t SrsRtc::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format)
{
srs_error_t err = srs_success;
// TODO: FIXME: Maybe it should config on vhost level.
if (!enabled) {
return err;
}
// Ignore if no format->vcodec, it means the codec is not parsed, or unknown codec.
// @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474
if (!format->vcodec) {
return err;
}
// update the hls time, for hls_dispose.
last_update_time = srs_get_system_time();
// ignore info frame,
// @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909
srs_assert(format->video);
return rtp_h264_muxer->frame_to_packet(shared_video, format);
}

View file

@ -0,0 +1,128 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_RTC_HPP
#define SRS_APP_RTC_HPP
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <map>
class SrsFormat;
class SrsSample;
class SrsSharedPtrMessage;
class SrsRtpSharedPacket;
class SrsRequest;
class SrsOriginHub;
class SrsAudioRecode;
class SrsBuffer;
const int max_payload_size = 1200;
const int kRtpPacketSize = 1500;
const uint8_t kOpusPayloadType = 111;
const uint8_t kH264PayloadType = 102;
const uint8_t kNalTypeMask = 0x1F;
const uint8_t kStapA = 24;
const uint8_t kFuA = 28;
const uint8_t kStart = 0x80;
const uint8_t kEnd = 0x40;
const int kChannel = 2;
const int kSamplerate = 48000;
const int kArrayLength = 8;
const int kArrayBuffer = 4096;
// FIXME: ssrc can relate to source
const uint32_t kAudioSSRC = 3233846890;
const uint32_t kVideoSSRC = 3233846889;
// TODO: Define interface class like ISrsRtpMuxer to support SrsRtpOpusMuxer and so on.
class SrsRtpMuxer
{
private:
uint16_t sequence;
std::string sps;
std::string pps;
public:
bool discard_bframe;
public:
SrsRtpMuxer();
virtual ~SrsRtpMuxer();
public:
srs_error_t frame_to_packet(SrsSharedPtrMessage* shared_video, SrsFormat* format);
private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
srs_error_t packet_stap_a(const std::string &sps, const std::string& pps, SrsSharedPtrMessage* shared_frame, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
};
// TODO: FIXME: It's not a muxer, but a transcoder.
class SrsRtpOpusMuxer
{
private:
// TODO: FIXME: How to handle timestamp overflow?
uint32_t timestamp;
uint16_t sequence;
SrsAudioRecode* recoder;
public:
SrsRtpOpusMuxer();
virtual ~SrsRtpOpusMuxer();
virtual srs_error_t initialize();
public:
srs_error_t frame_to_packet(SrsSharedPtrMessage* shared_audio, SrsFormat* format, SrsBuffer* stream);
private:
srs_error_t packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packet_vec);
};
class SrsRtc
{
private:
SrsRequest* req;
bool enabled;
bool disposable;
bool discard_aac;
srs_utime_t last_update_time;
SrsRtpMuxer* rtp_h264_muxer;
SrsRtpOpusMuxer* rtp_opus_muxer;
SrsOriginHub* hub;
public:
SrsRtc();
virtual ~SrsRtc();
public:
virtual void dispose();
virtual srs_error_t cycle();
public:
virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);
virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format);
};
#endif

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,295 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_RTC_CONN_HPP
#define SRS_APP_RTC_CONN_HPP
#include <srs_core.hpp>
#include <srs_app_listener.hpp>
#include <srs_service_st.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_hourglass.hpp>
#include <string>
#include <map>
#include <vector>
#include <openssl/ssl.h>
#include <srtp2/srtp.h>
class SrsUdpMuxSocket;
class SrsConsumer;
class SrsStunPacket;
class SrsRtcServer;
class SrsRtcSession;
class SrsSharedPtrMessage;
class SrsSource;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
const uint8_t kSDES = 202;
const uint8_t kBye = 203;
const uint8_t kApp = 204;
// @see: https://tools.ietf.org/html/rfc4585#section-6.1
const uint8_t kRtpFb = 205;
const uint8_t kPsFb = 206;
// @see: https://tools.ietf.org/html/rfc4585#section-6.3
const uint8_t kPLI = 1;
const uint8_t kSLI = 2;
const uint8_t kRPSI = 3;
const uint8_t kAFB = 15;
const srs_utime_t kSrsRtcSessionStunTimeoutUs = 10*1000*1000LL;
class SrsCandidate
{
private:
public:
SrsCandidate();
virtual ~SrsCandidate();
static std::vector<std::string> get_candidate_ips();
};
class SrsSdpMediaInfo
{
private:
public:
SrsSdpMediaInfo();
virtual ~SrsSdpMediaInfo();
};
class SrsSdp
{
private:
std::string sdp;
int version;
std::string ice_ufrag;
std::string ice_pwd;
std::string fingerprint;
std::string setup;
std::vector<SrsSdpMediaInfo> media_infos;
public:
SrsSdp();
virtual ~SrsSdp();
srs_error_t decode(const std::string& sdp_str);
srs_error_t encode(std::string& sdp_str);
std::string get_ice_ufrag() const { return ice_ufrag; }
std::string get_ice_pwd() const { return ice_pwd; }
void set_ice_ufrag(const std::string& u) { ice_ufrag = u; }
void set_ice_pwd(const std::string& p) { ice_pwd = p; }
private:
srs_error_t parse_attr(const std::string& line);
};
enum SrsRtcSessionStateType
{
// TODO: FIXME: Should prefixed by enum name.
INIT = -1,
WAITING_STUN = 1,
DOING_DTLS_HANDSHAKE = 2,
ESTABLISHED = 3,
CLOSED = 4,
};
class SrsDtlsSession
{
private:
SrsRtcSession* rtc_session;
SSL* dtls;
BIO* bio_in;
BIO* bio_out;
std::string client_key;
std::string server_key;
srtp_t srtp_send;
srtp_t srtp_recv;
bool handshake_done;
public:
SrsDtlsSession(SrsRtcSession* s);
virtual ~SrsDtlsSession();
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls_application_data(const char* data, const int len);
srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
public:
srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
private:
srs_error_t handshake(SrsUdpMuxSocket* udp_mux_skt);
private:
srs_error_t srtp_initialize();
srs_error_t srtp_send_init();
srs_error_t srtp_recv_init();
};
class SrsRtcSenderThread : public ISrsCoroutineHandler
{
protected:
SrsCoroutine* trd;
int _parent_cid;
private:
SrsRtcSession* rtc_session;
public:
SrsUdpMuxSocket* sendonly_ukt;
public:
SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid);
virtual ~SrsRtcSenderThread();
public:
virtual int cid();
public:
virtual srs_error_t start();
virtual void stop();
virtual void stop_loop();
public:
virtual srs_error_t cycle();
public:
void update_sendonly_socket(SrsUdpMuxSocket* ukt);
private:
void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt);
};
class SrsRtcSession
{
friend class SrsRtcSenderThread;
private:
SrsRtcServer* rtc_server;
SrsSdp remote_sdp;
SrsSdp local_sdp;
SrsRtcSessionStateType session_state;
SrsDtlsSession* dtls_session;
SrsRtcSenderThread* strd;
std::string username;
std::string peer_id;
srs_utime_t last_stun_time;
private:
// For each RTC session, we use a specified cid for debugging logs.
int cid;
public:
SrsRequest request;
SrsSource* source;
public:
SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id);
virtual ~SrsRtcSession();
public:
SrsSdp* get_local_sdp() { return &local_sdp; }
void set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; }
SrsSdp* get_remote_sdp() { return &remote_sdp; }
void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; }
SrsRtcSessionStateType get_session_state() { return session_state; }
void set_session_state(SrsRtcSessionStateType state) { session_state = state; }
std::string id() const { return peer_id + "_" + username; }
std::string get_peer_id() const { return peer_id; }
void set_peer_id(const std::string& id) { peer_id = id; }
void switch_to_context();
public:
srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt);
public:
srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_connection_established(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt);
public:
bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); }
private:
srs_error_t check_source();
private:
srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req);
private:
srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
};
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass
{
private:
SrsUdpMuxListener* listener;
SrsHourGlass* timer;
private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
public:
SrsRtcServer();
virtual ~SrsRtcServer();
public:
virtual srs_error_t initialize();
public:
// TODO: FIXME: Support gracefully quit.
// TODO: FIXME: Support reload.
virtual srs_error_t listen_udp();
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt);
public:
virtual srs_error_t listen_api();
SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session);
void check_and_clean_timeout_session();
private:
srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt);
private:
SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag);
SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
};
// The RTC server adapter.
class RtcServerAdapter : public ISrsHybridServer
{
private:
SrsRtcServer* rtc;
public:
RtcServerAdapter();
virtual ~RtcServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual void stop();
};
#endif

View file

@ -507,8 +507,8 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
}
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id(), source->source_id());
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=[%d][%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, ::getpid(), source->source_id());
source->set_cache(enabled_cache);
switch (info->type) {

View file

@ -1410,6 +1410,11 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
return err;
}
SrsHttpServeMux* SrsServer::api_server()
{
return http_api_mux;
}
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
srs_error_t err = srs_success;

View file

@ -315,6 +315,8 @@ public:
// for instance RTMP connection to serve client.
// @param stfd, the client fd in st boxed, the underlayer fd.
virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd);
// TODO: FIXME: Fetch from hybrid server manager.
virtual SrsHttpServeMux* api_server();
private:
virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn);
// Interface IConnectionManager

View file

@ -31,6 +31,7 @@ using namespace std;
#include <srs_rtmp_stack.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_rtp.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_forward.hpp>
#include <srs_app_config.hpp>
@ -49,6 +50,9 @@ using namespace std;
#include <srs_app_ng_exec.hpp>
#include <srs_app_dash.hpp>
#include <srs_protocol_format.hpp>
#ifdef SRS_AUTO_RTC
#include <srs_app_rtc.hpp>
#endif
#define CONST_MAX_JITTER_MS 250
#define CONST_MAX_JITTER_MS_NEG -250
@ -814,6 +818,57 @@ SrsSharedPtrMessage* SrsMixQueue::pop()
return msg;
}
#ifdef SRS_AUTO_RTC
SrsRtpPacketQueue::SrsRtpPacketQueue()
{
}
SrsRtpPacketQueue::~SrsRtpPacketQueue()
{
clear();
}
void SrsRtpPacketQueue::clear()
{
map<uint16_t, SrsRtpSharedPacket*>::iterator iter = pkt_queue.begin();
while (iter != pkt_queue.end()) {
srs_freep(iter->second);
pkt_queue.erase(iter++);
}
}
void SrsRtpPacketQueue::push(std::vector<SrsRtpSharedPacket*>& pkts)
{
for (int i = 0; i < (int)pkts.size(); ++i) {
insert(pkts[i]->sequence, pkts[i]);
}
}
void SrsRtpPacketQueue::insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt)
{
pkt_queue.insert(make_pair(sequence, pkt->copy()));
if (pkt_queue.size() >= 3000) {
srs_freep(pkt_queue.begin()->second);
pkt_queue.erase(pkt_queue.begin());
}
}
SrsRtpSharedPacket* SrsRtpPacketQueue::find(const uint16_t& sequence)
{
if (pkt_queue.empty()) {
return NULL;
}
SrsRtpSharedPacket* pkt = NULL;
map<uint16_t, SrsRtpSharedPacket*>::iterator iter = pkt_queue.find(sequence);
if (iter != pkt_queue.end()) {
pkt = iter->second->copy();
}
return pkt;
}
#endif
SrsOriginHub::SrsOriginHub()
{
source = NULL;
@ -824,6 +879,9 @@ SrsOriginHub::SrsOriginHub()
dash = new SrsDash();
dvr = new SrsDvr();
encoder = new SrsEncoder();
#ifdef SRS_AUTO_RTC
rtc = new SrsRtc();
#endif
#ifdef SRS_AUTO_HDS
hds = new SrsHds();
#endif
@ -867,6 +925,12 @@ srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r)
if ((err = format->initialize()) != srs_success) {
return srs_error_wrap(err, "format initialize");
}
#ifdef SRS_AUTO_RTC
if ((err = rtc->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "rtc initialize");
}
#endif
if ((err = hls->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "hls initialize");
@ -965,6 +1029,14 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
srs_flv_srates[c->sound_rate]);
}
#ifdef SRS_AUTO_RTC
if ((err = rtc->on_audio(msg, format)) != srs_success) {
srs_warn("rtc: ignore audio error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
rtc->on_unpublish();
}
#endif
if ((err = hls->on_audio(msg, format)) != srs_success) {
// apply the error strategy for hls.
@ -1058,8 +1130,23 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
if (format->vcodec && !format->vcodec->is_avc_codec_ok()) {
return err;
}
#ifdef SRS_AUTO_RTC
// Parse RTMP message to RTP packets, in FU-A if too large.
if ((err = rtc->on_video(msg, format)) != srs_success) {
// TODO: We should support more strategies.
srs_warn("rtc: ignore video error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
rtc->on_unpublish();
}
// TODO: FIXME: Refactor to move to rtp?
// Save the RTP packets for find_rtp_packet() to rtx or restore it.
source->rtp_queue->push(msg->rtp_packets);
#endif
if ((err = hls->on_video(msg, format)) != srs_success) {
// TODO: We should support more strategies.
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
@ -1125,7 +1212,13 @@ srs_error_t SrsOriginHub::on_publish()
if ((err = encoder->on_publish(req)) != srs_success) {
return srs_error_wrap(err, "encoder publish");
}
#ifdef SRS_AUTO_RTC
if ((err = rtc->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtc publish");
}
#endif
if ((err = hls->on_publish()) != srs_success) {
return srs_error_wrap(err, "hls publish");
}
@ -1163,6 +1256,9 @@ void SrsOriginHub::on_unpublish()
destroy_forwarders();
encoder->on_unpublish();
#ifdef SRS_AUTO_RTC
rtc->on_unpublish();
#endif
hls->on_unpublish();
dash->on_unpublish();
dvr->on_unpublish();
@ -1714,6 +1810,8 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsSource();
if ((err = source->initialize(r, h)) != srs_success) {
@ -1822,6 +1920,9 @@ SrsSource::SrsSource()
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
#ifdef SRS_AUTO_RTC
rtp_queue = new SrsRtpPacketQueue();
#endif
_can_publish = true;
_pre_source_id = _source_id = -1;
@ -1851,6 +1952,9 @@ SrsSource::~SrsSource()
srs_freep(hub);
srs_freep(meta);
srs_freep(mix_queue);
#ifdef SRS_AUTO_RTC
srs_freep(rtp_queue);
#endif
srs_freep(play_edge);
srs_freep(publish_edge);
@ -2177,6 +2281,11 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
}
}
// Copy to hub to all utilities.
if ((err = hub->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
@ -2187,11 +2296,6 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
}
}
// Copy to hub to all utilities.
if ((err = hub->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
// cache the sequence header of aac, or first packet of mp3.
// for example, the mp3 is used for hls to write the "right" audio codec.
// TODO: FIXME: to refine the stream info system.
@ -2256,7 +2360,7 @@ srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
return srs_error_wrap(err, "create message");
}
// directly process the audio message.
// directly process the video message.
if (!mix_correct) {
return on_video_imp(&msg);
}
@ -2306,7 +2410,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
return srs_error_wrap(err, "hub consume video");
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
@ -2608,3 +2712,9 @@ string SrsSource::get_curr_origin()
return play_edge->get_curr_origin();
}
#ifdef SRS_AUTO_RTC
SrsRtpSharedPacket* SrsSource::find_rtp_packet(const uint16_t& seq)
{
return rtp_queue->find(seq);
}
#endif

View file

@ -54,6 +54,7 @@ class SrsNgExec;
class SrsConnection;
class SrsMessageHeader;
class SrsHls;
class SrsRtc;
class SrsDvr;
class SrsDash;
class SrsEncoder;
@ -61,6 +62,7 @@ class SrsBuffer;
#ifdef SRS_AUTO_HDS
class SrsHds;
#endif
class SrsRtpSharedPacket;
// The time jitter algorithm:
// 1. full, to ensure stream start at zero, and ensure stream monotonically increasing.
@ -323,6 +325,31 @@ public:
virtual SrsSharedPtrMessage* pop();
};
#ifdef SRS_AUTO_RTC
// To find the RTP packet for RTX or restore.
class SrsRtpPacketQueue
{
private:
struct SeqComp
{
bool operator()(const uint16_t& l, const uint16_t& r) const
{
return ((int16_t)(r - l)) > 0;
}
};
private:
std::map<uint16_t, SrsRtpSharedPacket*, SeqComp> pkt_queue;
public:
SrsRtpPacketQueue();
virtual ~SrsRtpPacketQueue();
public:
void clear();
void push(std::vector<SrsRtpSharedPacket*>& pkts);
void insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt);
SrsRtpSharedPacket* find(const uint16_t& sequence);
};
#endif
// The hub for origin is a collection of utilities for origin only,
// For example, DVR, HLS, Forward and Transcode are only available for origin,
// they are meanless for edge server.
@ -335,6 +362,10 @@ private:
private:
// The format, codec information.
SrsRtmpFormat* format;
#ifdef SRS_AUTO_RTC
// rtc handler
SrsRtc* rtc;
#endif
// hls handler.
SrsHls* hls;
// The DASH encoder.
@ -507,6 +538,10 @@ private:
bool mix_correct;
// The mix queue to implements the mix correct algorithm.
SrsMixQueue* mix_queue;
#ifdef SRS_AUTO_RTC
// rtp packet queue
SrsRtpPacketQueue* rtp_queue;
#endif
// For play, whether enabled atc.
// The atc(use absolute time and donot adjust time),
// directly use msg time and donot adjust if atc is true,
@ -595,6 +630,11 @@ public:
virtual void on_edge_proxy_unpublish();
public:
virtual std::string get_curr_origin();
public:
#ifdef SRS_AUTO_RTC
// Find rtp packet by sequence
SrsRtpSharedPacket* find_rtp_packet(const uint16_t& seq);
#endif
};
#endif

View file

@ -1193,3 +1193,55 @@ void srs_api_dump_summaries(SrsJsonObject* obj)
sys->set("conn_srs", SrsJsonAny::integer(nrs->nb_conn_srs));
}
string srs_string_dumps_hex(const std::string& str, const int& limit)
{
return srs_string_dumps_hex(str.c_str(), str.size(), limit);
}
string srs_string_dumps_hex(const char* buf, const int length, const int& limit)
{
string ret;
char tmp_buf[1024*16];
tmp_buf[0] = '\n';
int len = 1;
for (int i = 0; i < length && i < limit; ++i) {
int nb = snprintf(tmp_buf + len, sizeof(tmp_buf) - len - 2, "%02X ", (uint8_t)buf[i]);
if (nb <= 0)
break;
len += nb;
if (i % 48 == 47) {
tmp_buf[len++] = '\n';
ret.append(tmp_buf, len);
len = 0;
}
}
tmp_buf[len] = '\0';
ret.append(tmp_buf, len);
return ret;
}
string srs_getenv(string key)
{
string ekey = key;
if (srs_string_starts_with(key, "$")) {
ekey = key.substr(1);
}
if (ekey.empty()) {
return "";
}
char* value = ::getenv(ekey.c_str());
if (value) {
return value;
}
return "";
}

View file

@ -30,6 +30,7 @@
#include <string>
#include <sstream>
#include <limits.h>
#include <arpa/inet.h>
#include <sys/resource.h>
@ -649,5 +650,13 @@ extern bool srs_is_boolean(std::string str);
// Dump summaries for /api/v1/summaries.
extern void srs_api_dump_summaries(SrsJsonObject* obj);
// Dump string(str in length) to hex, it will process min(limit, length) chars.
extern std::string srs_string_dumps_hex(const std::string& str, const int& limit = INT_MAX);
extern std::string srs_string_dumps_hex(const char* str, const int length, const int& limit = INT_MAX);
// Get ENV variable, which may starts with $.
// srs_getenv("EIP") === srs_getenv("$EIP")
extern std::string srs_getenv(std::string key);
#endif

View file

@ -398,6 +398,24 @@ enum SrsAvcNaluType
};
std::string srs_avc_nalu2str(SrsAvcNaluType nalu_type);
/**
* Table 7-6 Name association to slice_type
* ISO_IEC_14496-10-AVC-2012.pdf, page 105.
*/
enum SrsAvcSliceType
{
SrsAvcSliceTypeP = 0,
SrsAvcSliceTypeB = 1,
SrsAvcSliceTypeI = 2,
SrsAvcSliceTypeSP = 3,
SrsAvcSliceTypeSI = 4,
SrsAvcSliceTypeP1 = 5,
SrsAvcSliceTypeB1 = 6,
SrsAvcSliceTypeI1 = 7,
SrsAvcSliceTypeSP1 = 8,
SrsAvcSliceTypeSI1 = 9,
};
/**
* the avc payload format, must be ibmf or annexb format.
* we guess by annexb first, then ibmf for the first time,

View file

@ -69,14 +69,14 @@ std::string SrsCplxError::description() {
if (desc.empty()) {
stringstream ss;
ss << "code=" << code;
SrsCplxError* next = this;
while (next) {
ss << " : " << next->msg;
next = next->wrapped;
}
ss << endl;
next = this;
while (next) {
ss << "thread [" << getpid() << "][" << next->cid << "]: "
@ -89,13 +89,29 @@ std::string SrsCplxError::description() {
ss << endl;
}
}
desc = ss.str();
}
return desc;
}
std::string SrsCplxError::summary() {
if (_summary.empty()) {
stringstream ss;
SrsCplxError* next = this;
while (next) {
ss << " : " << next->msg;
next = next->wrapped;
}
_summary = ss.str();
}
return _summary;
}
SrsCplxError* SrsCplxError::create(const char* func, const char* file, int line, int code, const char* fmt, ...) {
int rerrno = (int)errno;
@ -178,6 +194,11 @@ string SrsCplxError::description(SrsCplxError* err)
return err? err->description() : "Success";
}
string SrsCplxError::summary(SrsCplxError* err)
{
return err? err->summary() : "Success";
}
int SrsCplxError::error_code(SrsCplxError* err)
{
return err? err->code : ERROR_SUCCESS;

View file

@ -325,6 +325,28 @@
#define ERROR_BASE64_DECODE 4039
#define ERROR_HTTP_STREAM_EOF 4040
///////////////////////////////////////////////////////
// RTC protocol error.
///////////////////////////////////////////////////////
#define ERROR_RTC_PORT 5000
#define ERROR_RTP_PACKET_CREATE 5001
#define ERROR_OpenSslCreateSSL 5002
#define ERROR_OpenSslBIOReset 5003
#define ERROR_OpenSslBIOWrite 5004
#define ERROR_OpenSslBIONew 5005
#define ERROR_RTC_RTP 5006
#define ERROR_RTC_RTCP 5007
#define ERROR_RTC_STUN 5008
#define ERROR_RTC_DTLS 5009
#define ERROR_RTC_UDP 5010
#define ERROR_RTC_RTP_MUXER 5011
#define ERROR_RTC_SDP_DECODE 5012
#define ERROR_RTC_SRTP_INIT 5013
#define ERROR_RTC_SRTP_PROTECT 5014
#define ERROR_RTC_SRTP_UNPROTECT 5015
#define ERROR_RTC_RTCP_CHECK 5016
#define ERROR_RTC_SOURCE_CHECK 5017
///////////////////////////////////////////////////////
// HTTP API error.
///////////////////////////////////////////////////////
@ -364,18 +386,21 @@ private:
int rerrno;
std::string desc;
std::string _summary;
private:
SrsCplxError();
public:
virtual ~SrsCplxError();
private:
virtual std::string description();
virtual std::string summary();
public:
static SrsCplxError* create(const char* func, const char* file, int line, int code, const char* fmt, ...);
static SrsCplxError* wrap(const char* func, const char* file, int line, SrsCplxError* err, const char* fmt, ...);
static SrsCplxError* success();
static SrsCplxError* copy(SrsCplxError* from);
static std::string description(SrsCplxError* err);
static std::string summary(SrsCplxError* err);
static int error_code(SrsCplxError* err);
};
@ -385,6 +410,7 @@ public:
#define srs_error_wrap(err, fmt, ...) SrsCplxError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__)
#define srs_error_copy(err) SrsCplxError::copy(err)
#define srs_error_desc(err) SrsCplxError::description(err)
#define srs_error_summary(err) SrsCplxError::summary(err)
#define srs_error_code(err) SrsCplxError::error_code(err)
#define srs_error_reset(err) srs_freep(err); err = srs_success

View file

@ -40,6 +40,9 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_core_mem_watch.hpp>
#include <srs_core_autofree.hpp>
#ifdef SRS_AUTO_RTC
#include <srs_kernel_rtp.hpp>
#endif
SrsMessageHeader::SrsMessageHeader()
{
@ -228,6 +231,12 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage()
ptr->shared_count--;
}
}
#ifdef SRS_AUTO_RTC
for (int i = 0; i < (int)rtp_packets.size(); ++i) {
srs_freep(rtp_packets[i]);
}
#endif
}
srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage* msg)
@ -345,10 +354,23 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
copy->stream_id = stream_id;
copy->payload = ptr->payload;
copy->size = ptr->size;
#ifdef SRS_AUTO_RTC
for (int i = 0; i < (int)rtp_packets.size(); ++i) {
copy->rtp_packets.push_back(rtp_packets[i]->copy());
}
#endif
return copy;
}
#ifdef SRS_AUTO_RTC
void SrsSharedPtrMessage::set_rtp_packets(const std::vector<SrsRtpSharedPacket*>& pkts)
{
rtp_packets = pkts;
}
#endif
SrsFlvTransmuxer::SrsFlvTransmuxer()
{
writer = NULL;

View file

@ -27,6 +27,7 @@
#include <srs_core.hpp>
#include <string>
#include <vector>
// For srs-librtmp, @see https://github.com/ossrs/srs/issues/213
#ifndef _WIN32
@ -38,6 +39,7 @@ class ISrsWriter;
class ISrsReader;
class SrsFileReader;
class SrsPacket;
class SrsRtpSharedPacket;
#define SRS_FLV_TAG_HEADER_SIZE 11
#define SRS_FLV_PREVIOUS_TAG_SIZE 4
@ -285,6 +287,11 @@ public:
// @remark, not all message payload can be decoded to packet. for example,
// video/audio packet use raw bytes, no video/audio packet.
char* payload;
#ifdef SRS_AUTO_RTC
std::vector<SrsRtpSharedPacket*> rtp_packets;
#endif
private:
class SrsSharedPtrPayload
{
@ -339,6 +346,10 @@ public:
// copy current shared ptr message, use ref-count.
// @remark, assert object is created.
virtual SrsSharedPtrMessage* copy();
public:
#ifdef SRS_AUTO_RTC
virtual void set_rtp_packets(const std::vector<SrsRtpSharedPacket*>& pkts);
#endif
};
// Transmux RTMP packets to FLV stream.

View file

@ -0,0 +1,128 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_kernel_rtp.hpp>
#include <fcntl.h>
#include <sstream>
using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_utility.hpp>
SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload()
{
payload = NULL;
size = 0;
shared_count = 0;
}
SrsRtpSharedPacket::SrsRtpSharedPacketPayload::~SrsRtpSharedPacketPayload()
{
srs_freepa(payload);
}
SrsRtpSharedPacket::SrsRtpSharedPacket()
{
payload_ptr = NULL;
payload = NULL;
size = 0;
timestamp = -1;
sequence = 0;
ssrc = 0;
payload_type = 0;
}
SrsRtpSharedPacket::~SrsRtpSharedPacket()
{
if (payload_ptr) {
if (payload_ptr->shared_count == 0) {
srs_freep(payload_ptr);
} else {
--payload_ptr->shared_count;
}
}
}
srs_error_t SrsRtpSharedPacket::create(int64_t t, uint16_t seq, uint32_t sc, uint16_t pt, char* p, int s)
{
srs_error_t err = srs_success;
if (size < 0) {
return srs_error_new(ERROR_RTP_PACKET_CREATE, "create packet size=%d", size);
}
srs_assert(!payload_ptr);
timestamp = t;
sequence = seq;
ssrc = sc;
payload_type = pt;
payload_ptr = new SrsRtpSharedPacketPayload();
payload_ptr->payload = p;
payload_ptr->size = s;
payload = payload_ptr->payload;
size = payload_ptr->size;
return err;
}
SrsRtpSharedPacket* SrsRtpSharedPacket::copy()
{
SrsRtpSharedPacket* copy = new SrsRtpSharedPacket();
copy->payload_ptr = payload_ptr;
payload_ptr->shared_count++;
copy->payload = payload;
copy->size = size;
copy->timestamp = timestamp;
copy->sequence = sequence;
copy->ssrc = ssrc;
copy->payload_type = payload_type;
return copy;
}
srs_error_t SrsRtpSharedPacket::set_marker(bool marker)
{
srs_error_t err = srs_success;
if (payload_ptr == NULL || payload_ptr->payload == NULL || payload_ptr->size < 1) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect");
}
if (marker) {
payload_ptr->payload[1] |= kMarker;
} else {
payload_ptr->payload[1] &= (~kMarker);
}
return err;
}

View file

@ -0,0 +1,67 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_KERNEL_RTP_HPP
#define SRS_KERNEL_RTP_HPP
#include <srs_core.hpp>
#include <string>
const uint8_t kMarker = 0x80;
class SrsRtpSharedPacket
{
private:
class SrsRtpSharedPacketPayload
{
public:
char* payload;
int size;
int shared_count;
public:
SrsRtpSharedPacketPayload();
virtual ~SrsRtpSharedPacketPayload();
};
private:
SrsRtpSharedPacketPayload* payload_ptr;
public:
char* payload;
int size;
public:
int64_t timestamp;
uint16_t sequence;
uint32_t ssrc;
uint16_t payload_type;
public:
SrsRtpSharedPacket();
virtual ~SrsRtpSharedPacket();
public:
srs_error_t create(int64_t t, uint16_t seq, uint32_t sc, uint16_t pt, char* p, int s);
SrsRtpSharedPacket* copy();
// interface to modify rtp header
public:
srs_error_t set_marker(bool marker);
};
#endif

View file

@ -50,6 +50,9 @@ using namespace std;
#include <srs_core_autofree.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_hybrid.hpp>
#ifdef SRS_AUTO_RTC
#include <srs_app_rtc_conn.hpp>
#endif
#ifdef SRS_AUTO_SRT
#include <srt_server.hpp>
@ -59,7 +62,6 @@ using namespace std;
srs_error_t run_directly_or_daemon();
srs_error_t run_hybrid_server();
void show_macro_features();
string srs_getenv(const char* name);
// @global log and context.
ISrsLog* _srs_log = new SrsFastLog();
@ -344,17 +346,6 @@ void show_macro_features()
#endif
}
string srs_getenv(const char* name)
{
char* cv = ::getenv(name);
if (cv) {
return cv;
}
return "";
}
// Detect docker by https://stackoverflow.com/a/41559867
bool _srs_in_docker = false;
srs_error_t srs_detect_docker()
@ -453,11 +444,17 @@ srs_error_t run_hybrid_server()
{
srs_error_t err = srs_success;
// Create servers and register them.
_srs_hybrid->register_server(new SrsServerAdapter());
#ifdef SRS_AUTO_SRT
_srs_hybrid->register_server(new SrtServerAdapter());
#endif
#ifdef SRS_AUTO_RTC
_srs_hybrid->register_server(new RtcServerAdapter());
#endif
// Do some system initialize.
if ((err = _srs_hybrid->initialize()) != srs_success) {
return srs_error_wrap(err, "hybrid initialize");

View file

@ -0,0 +1,240 @@
#include <srs_stun_stack.hpp>
using namespace std;
#include <openssl/dh.h>
#include <openssl/evp.h>
#include <openssl/hmac.h>
#include <openssl/ssl.h>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
static srs_error_t hmac_encode(const std::string& algo, const char* key, const int& key_length,
const char* input, const int input_length, char* output, unsigned int& output_length)
{
srs_error_t err = srs_success;
const EVP_MD* engine = NULL;
if (algo == "sha512") {
engine = EVP_sha512();
} else if(algo == "sha256") {
engine = EVP_sha256();
} else if(algo == "sha1") {
engine = EVP_sha1();
} else if(algo == "md5") {
engine = EVP_md5();
} else if(algo == "sha224") {
engine = EVP_sha224();
} else if(algo == "sha384") {
engine = EVP_sha384();
} else {
return srs_error_new(ERROR_RTC_STUN, "unknown algo=%s", algo.c_str());
}
HMAC_CTX* ctx = HMAC_CTX_new();
if (ctx == NULL) {
return srs_error_new(ERROR_RTC_STUN, "hmac init faied");
}
if (HMAC_Init_ex(ctx, key, key_length, engine, NULL) < 0) {
HMAC_CTX_free(ctx);
return srs_error_new(ERROR_RTC_STUN, "hmac init faied");
}
if (HMAC_Update(ctx, (const unsigned char*)input, input_length) < 0) {
HMAC_CTX_free(ctx);
return srs_error_new(ERROR_RTC_STUN, "hmac update faied");
}
if (HMAC_Final(ctx, (unsigned char*)output, &output_length) < 0) {
HMAC_CTX_free(ctx);
return srs_error_new(ERROR_RTC_STUN, "hmac final faied");
}
HMAC_CTX_free(ctx);
return err;
}
SrsStunPacket::SrsStunPacket()
{
message_type = 0;
local_ufrag = "";
remote_ufrag = "";
}
SrsStunPacket::~SrsStunPacket()
{
}
srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf)
{
srs_error_t err = srs_success;
SrsBuffer* stream = new SrsBuffer(const_cast<char*>(buf), nb_buf);
SrsAutoFree(SrsBuffer, stream);
if (stream->left() < 20) {
return srs_error_new(ERROR_RTC_STUN, "invalid stun packet, size=%d", stream->size());
}
message_type = stream->read_2bytes();
uint16_t message_len = stream->read_2bytes();
string magic_cookie = stream->read_string(4);
transcation_id = stream->read_string(12);
if (nb_buf != 20 + message_len) {
return srs_error_new(ERROR_RTC_STUN, "invalid stun packet, message_len=%d, nb_buf=%d", message_len, nb_buf);
}
while (stream->left() >= 4) {
uint16_t type = stream->read_2bytes();
uint16_t len = stream->read_2bytes();
if (stream->left() < len) {
return srs_error_new(ERROR_RTC_STUN, "invalid stun packet");
}
string val = stream->read_string(len);
// padding
if (len % 4 != 0) {
stream->read_string(4 - (len % 4));
}
switch (type) {
case Username: {
username = val;
size_t p = val.find(":");
if (p != string::npos) {
local_ufrag = val.substr(0, p);
remote_ufrag = val.substr(p + 1);
srs_verbose("stun packet local_ufrag=%s, remote_ufrag=%s", local_ufrag.c_str(), remote_ufrag.c_str());
}
break;
}
default: {
break;
}
}
}
return err;
}
srs_error_t SrsStunPacket::encode(const string& pwd, SrsBuffer* stream)
{
if (is_binding_response()) {
return encode_binding_response(pwd, stream);
}
return srs_error_new(ERROR_RTC_STUN, "unknown stun type=%d", get_message_type());
}
// FIXME: make this function easy to read
srs_error_t SrsStunPacket::encode_binding_response(const string& pwd, SrsBuffer* stream)
{
srs_error_t err = srs_success;
string property_username = encode_username();
string mapped_address = encode_mapped_address();
stream->write_2bytes(BindingResponse);
stream->write_2bytes(property_username.size() + mapped_address.size());
stream->write_4bytes(kStunMagicCookie);
stream->write_string(transcation_id);
stream->write_string(property_username);
stream->write_string(mapped_address);
stream->data()[2] = ((stream->pos() - 20 + 20 + 4) & 0x0000FF00) >> 8;
stream->data()[3] = ((stream->pos() - 20 + 20 + 4) & 0x000000FF);
char hmac_buf[20] = {0};
unsigned int hmac_buf_len = 0;
if ((err = hmac_encode("sha1", pwd.c_str(), pwd.size(), stream->data(), stream->pos(), hmac_buf, hmac_buf_len)) != srs_success) {
return srs_error_wrap(err, "hmac encode failed");
}
string hmac = encode_hmac(hmac_buf, hmac_buf_len);
stream->write_string(hmac);
stream->data()[2] = ((stream->pos() - 20 + 8) & 0x0000FF00) >> 8;
stream->data()[3] = ((stream->pos() - 20 + 8) & 0x000000FF);
uint32_t crc32 = srs_crc32_ieee(stream->data(), stream->pos(), 0) ^ 0x5354554E;
string fingerprint = encode_fingerprint(crc32);
stream->write_string(fingerprint);
stream->data()[2] = ((stream->pos() - 20) & 0x0000FF00) >> 8;
stream->data()[3] = ((stream->pos() - 20) & 0x000000FF);
return err;
}
string SrsStunPacket::encode_username()
{
char buf[1460];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
string username = remote_ufrag + ":" + local_ufrag;
stream->write_2bytes(Username);
stream->write_2bytes(username.size());
stream->write_string(username);
if (stream->pos() % 4 != 0) {
static char padding[4] = {0};
stream->write_bytes(padding, 4 - (stream->pos() % 4));
}
return string(stream->data(), stream->pos());
}
string SrsStunPacket::encode_mapped_address()
{
char buf[1460];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
stream->write_2bytes(XorMappedAddress);
stream->write_2bytes(8);
stream->write_1bytes(0); // ignore this bytes
stream->write_1bytes(1); // ipv4 family
stream->write_2bytes(mapped_port ^ (kStunMagicCookie >> 16));
stream->write_4bytes(mapped_address ^ kStunMagicCookie);
return string(stream->data(), stream->pos());
}
string SrsStunPacket::encode_hmac(char* hmac_buf, const int hmac_buf_len)
{
char buf[1460];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
stream->write_2bytes(MessageIntegrity);
stream->write_2bytes(hmac_buf_len);
stream->write_bytes(hmac_buf, hmac_buf_len);
return string(stream->data(), stream->pos());
}
string SrsStunPacket::encode_fingerprint(uint32_t crc32)
{
char buf[1460];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
stream->write_2bytes(Fingerprint);
stream->write_2bytes(4);
stream->write_4bytes(crc32);
return string(stream->data(), stream->pos());
}

View file

@ -0,0 +1,115 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_PROTOCOL_STUN_HPP
#define SRS_PROTOCOL_STUN_HPP
#include <string>
#include <srs_core.hpp>
#include <srs_kernel_error.hpp>
class SrsBuffer;
// @see: https://tools.ietf.org/html/rfc5389
// The magic cookie field MUST contain the fixed value 0x2112A442 in network byte order
const uint32_t kStunMagicCookie = 0x2112A442;
enum SrsStunMessageType
{
// see @ https://tools.ietf.org/html/rfc3489#section-11.1
BindingRequest = 0x0001,
BindingResponse = 0x0101,
BindingErrorResponse = 0x0111,
SharedSecretRequest = 0x0002,
SharedSecretResponse = 0x0102,
SharedSecretErrorResponse = 0x0112,
};
enum SrsStunMessageAttribute
{
// see @ https://tools.ietf.org/html/rfc3489#section-11.2
MappedAddress = 0x0001,
ResponseAddress = 0x0002,
ChangeRequest = 0x0003,
SourceAddress = 0x0004,
ChangedAddress = 0x0005,
Username = 0x0006,
Password = 0x0007,
MessageIntegrity = 0x0008,
ErrorCode = 0x0009,
UnknownAttributes = 0x000A,
ReflectedFrom = 0x000B,
// see @ https://tools.ietf.org/html/rfc5389#section-18.2
Realm = 0x0014,
Nonce = 0x0015,
XorMappedAddress = 0x0020,
Software = 0x8022,
AlternateServer = 0x8023,
Fingerprint = 0x8028,
};
class SrsStunPacket
{
private:
uint16_t message_type;
std::string username;
std::string local_ufrag;
std::string remote_ufrag;
std::string transcation_id;
uint32_t mapped_address;
uint16_t mapped_port;
public:
SrsStunPacket();
virtual ~SrsStunPacket();
bool is_binding_request() const { return message_type == BindingRequest; }
bool is_binding_response() const { return message_type == BindingResponse; }
uint16_t get_message_type() const { return message_type; }
std::string get_username() const { return username; }
std::string get_local_ufrag() const { return local_ufrag; }
std::string get_remote_ufrag() const { return remote_ufrag; }
std::string get_transcation_id() const { return transcation_id; }
uint32_t get_mapped_address() const { return mapped_address; }
uint16_t get_mapped_port() const { return mapped_port; }
void set_message_type(const uint16_t& m) { message_type = m; }
void set_local_ufrag(const std::string& u) { local_ufrag = u; }
void set_remote_ufrag(const std::string& u) { remote_ufrag = u; }
void set_transcation_id(const std::string& t) { transcation_id = t; }
void set_mapped_address(const uint32_t& addr) { mapped_address = addr; }
void set_mapped_port(const uint32_t& port) { mapped_port = port; }
srs_error_t decode(const char* buf, const int nb_buf);
srs_error_t encode(const std::string& pwd, SrsBuffer* stream);
private:
srs_error_t encode_binding_response(const std::string& pwd, SrsBuffer* stream);
std::string encode_username();
std::string encode_mapped_address();
std::string encode_hmac(char* hamc_buf, const int hmac_buf_len);
std::string encode_fingerprint(uint32_t crc32);
};
#endif

View file

@ -907,7 +907,9 @@ srs_error_t SrsHttpResponseWriter::send_header(char* data, int size)
}
// keep alive to make vlc happy.
hdr->set("Connection", "Keep-Alive");
if (hdr->get("Connection").empty()) {
hdr->set("Connection", "Keep-Alive");
}
// Filter the header before writing it.
if (hf && ((err = hf->filter(hdr)) != srs_success)) {

View file

@ -397,6 +397,16 @@ int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, in
return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout);
}
int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to, int tolen, srs_utime_t timeout)
{
return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout);
}
int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout)
{
return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{
return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);

View file

@ -88,6 +88,8 @@ extern srs_netfd_t srs_netfd_open_socket(int osfd);
extern srs_netfd_t srs_netfd_open(int osfd);
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout);
extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout);
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);