1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

adjust code style, fix some bug, add rtc session timeout

This commit is contained in:
xiaozhihong 2020-03-13 00:24:56 +08:00
parent da72caf8b9
commit 6decdc7838
10 changed files with 508 additions and 251 deletions

View file

@ -29,6 +29,8 @@ using namespace std;
#include <srs_kernel_log.hpp>
#include <srtp2/srtp.h>
SrsDtls* SrsDtls::_instance = NULL;
SrsDtls::SrsDtls()
@ -50,6 +52,10 @@ SrsDtls* SrsDtls::instance()
void SrsDtls::init()
{
// srtp init first
srs_assert(srtp_init() == 0);
// init dtls context
EVP_PKEY* dtls_private_key = EVP_PKEY_new();
srs_assert(dtls_private_key);

View file

@ -60,15 +60,15 @@ srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
return srs_success;
}
ISrsUdpRemuxHandler::ISrsUdpRemuxHandler()
ISrsUdpMuxHandler::ISrsUdpMuxHandler()
{
}
ISrsUdpRemuxHandler::~ISrsUdpRemuxHandler()
ISrsUdpMuxHandler::~ISrsUdpMuxHandler()
{
}
srs_error_t ISrsUdpRemuxHandler::on_stfd_change(srs_netfd_t /*fd*/)
srs_error_t ISrsUdpMuxHandler::on_stfd_change(srs_netfd_t /*fd*/)
{
return srs_success;
}
@ -221,7 +221,7 @@ srs_error_t SrsTcpListener::cycle()
return err;
}
SrsUdpRemuxSocket::SrsUdpRemuxSocket(srs_netfd_t fd)
SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd)
{
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
@ -232,12 +232,31 @@ SrsUdpRemuxSocket::SrsUdpRemuxSocket(srs_netfd_t fd)
fromlen = 0;
}
SrsUdpRemuxSocket::~SrsUdpRemuxSocket()
SrsUdpMuxSocket::~SrsUdpMuxSocket()
{
srs_freepa(buf);
}
int SrsUdpRemuxSocket::recvfrom(srs_utime_t timeout)
SrsUdpMuxSocket::SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs)
{
operator=(rhs);
}
SrsUdpMuxSocket& SrsUdpMuxSocket::operator=(const SrsUdpMuxSocket& rhs)
{
buf = NULL;
nb_buf = 0;
nread = 0;
lfd = rhs.lfd;
from = rhs.from;
fromlen = rhs.fromlen;
peer_ip = rhs.peer_ip;
peer_port = rhs.peer_port;
return *this;
}
int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
{
fromlen = sizeof(from);
nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout);
@ -259,12 +278,23 @@ int SrsUdpRemuxSocket::recvfrom(srs_utime_t timeout)
return nread;
}
int SrsUdpRemuxSocket::sendto(void* data, int size, srs_utime_t timeout)
int SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
{
return srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout);
}
std::string SrsUdpRemuxSocket::get_peer_id()
int SrsUdpMuxSocket::sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout)
{
struct msghdr udphdr = {0};
udphdr.msg_name = &from;
udphdr.msg_namelen = fromlen;
udphdr.msg_iov = iov;
udphdr.msg_iovlen = iovlen;
return srs_sendmsg(lfd, &udphdr, 0, timeout);
}
std::string SrsUdpMuxSocket::get_peer_id()
{
char id_buf[1024];
int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port);
@ -272,7 +302,7 @@ std::string SrsUdpRemuxSocket::get_peer_id()
return string(id_buf, len);
}
SrsUdpRemuxListener::SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i, int p)
SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p)
{
handler = h;
ip = i;
@ -285,30 +315,32 @@ SrsUdpRemuxListener::SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i,
trd = new SrsDummyCoroutine();
}
SrsUdpRemuxListener::~SrsUdpRemuxListener()
SrsUdpMuxListener::~SrsUdpMuxListener()
{
srs_freep(trd);
srs_close_stfd(lfd);
srs_freepa(buf);
}
int SrsUdpRemuxListener::fd()
int SrsUdpMuxListener::fd()
{
return srs_netfd_fileno(lfd);
}
srs_netfd_t SrsUdpRemuxListener::stfd()
srs_netfd_t SrsUdpMuxListener::stfd()
{
return lfd;
}
srs_error_t SrsUdpRemuxListener::listen()
srs_error_t SrsUdpMuxListener::listen()
{
srs_error_t err = srs_success;
if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) {
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
set_socket_buffer();
srs_freep(trd);
trd = new SrsSTCoroutine("udp", this);
@ -319,7 +351,38 @@ srs_error_t SrsUdpRemuxListener::listen()
return err;
}
srs_error_t SrsUdpRemuxListener::cycle()
void SrsUdpMuxListener::set_socket_buffer()
{
int sndbuf_size = 0;
socklen_t opt_len = sizeof(sndbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len);
srs_trace("default udp remux socket sndbuf=%d", sndbuf_size);
sndbuf_size = 1024*1024*10; // 10M
if (setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, sizeof(sndbuf_size)) < 0) {
srs_warn("set sock opt SO_SNDBUFFORCE failed");
}
opt_len = sizeof(sndbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len);
srs_trace("udp remux socket sndbuf=%d", sndbuf_size);
int rcvbuf_size = 0;
opt_len = sizeof(rcvbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len);
srs_trace("default udp remux socket rcvbuf=%d", rcvbuf_size);
rcvbuf_size = 1024*1024*10; // 10M
if (setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, sizeof(rcvbuf_size)) < 0) {
srs_warn("set sock opt SO_RCVBUFFORCE failed");
}
opt_len = sizeof(rcvbuf_size);
getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len);
srs_trace("udp remux socket rcvbuf=%d", rcvbuf_size);
}
srs_error_t SrsUdpMuxListener::cycle()
{
srs_error_t err = srs_success;
@ -328,15 +391,15 @@ srs_error_t SrsUdpRemuxListener::cycle()
return srs_error_wrap(err, "udp listener");
}
SrsUdpRemuxSocket udp_remux_socket(lfd);
SrsUdpMuxSocket udp_mux_skt(lfd);
if (udp_remux_socket.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) {
if (udp_mux_skt.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) {
srs_error("udp recv error");
// remux udp never return
continue;
}
if ((err = handler->on_udp_packet(&udp_remux_socket)) != srs_success) {
if ((err = handler->on_udp_packet(&udp_mux_skt)) != srs_success) {
// remux udp never return
srs_error("udp packet handler error:%s", srs_error_desc(err).c_str());
continue;

View file

@ -35,7 +35,7 @@
struct sockaddr;
class SrsUdpRemuxSocket;
class SrsUdpMuxSocket;
// The udp packet handler.
class ISrsUdpHandler
@ -58,14 +58,14 @@ public:
virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0;
};
class ISrsUdpRemuxHandler
class ISrsUdpMuxHandler
{
public:
ISrsUdpRemuxHandler();
virtual ~ISrsUdpRemuxHandler();
ISrsUdpMuxHandler();
virtual ~ISrsUdpMuxHandler();
public:
virtual srs_error_t on_stfd_change(srs_netfd_t fd);
virtual srs_error_t on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) = 0;
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) = 0;
};
// The tcp connection handler.
@ -127,7 +127,7 @@ public:
virtual srs_error_t cycle();
};
class SrsUdpRemuxSocket
class SrsUdpMuxSocket
{
private:
char* buf;
@ -139,11 +139,15 @@ private:
std::string peer_ip;
int peer_port;
public:
SrsUdpRemuxSocket(srs_netfd_t fd);
virtual ~SrsUdpRemuxSocket();
SrsUdpMuxSocket(srs_netfd_t fd);
virtual ~SrsUdpMuxSocket();
SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs);
SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs);
int recvfrom(srs_utime_t timeout);
int sendto(void* data, int size, srs_utime_t timeout);
int sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout);
char* data() { return buf; }
int size() { return nread; }
@ -152,7 +156,7 @@ public:
std::string get_peer_id();
};
class SrsUdpRemuxListener : public ISrsCoroutineHandler
class SrsUdpMuxListener : public ISrsCoroutineHandler
{
protected:
srs_netfd_t lfd;
@ -161,12 +165,12 @@ protected:
char* buf;
int nb_buf;
protected:
ISrsUdpRemuxHandler* handler;
ISrsUdpMuxHandler* handler;
std::string ip;
int port;
public:
SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i, int p);
virtual ~SrsUdpRemuxListener();
SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p);
virtual ~SrsUdpMuxListener();
public:
virtual int fd();
virtual srs_netfd_t stfd();
@ -175,6 +179,8 @@ public:
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
void set_socket_buffer();
};
#endif

View file

@ -65,6 +65,11 @@ static bool is_rtp_or_rtcp(const char* data, size_t len)
return (len >= 12 && (data[0] & 0xC0) == 0x80);
}
static bool is_rtcp(const char* data, size_t len)
{
return (len >=12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209);
}
static string gen_random_str(int len)
{
static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
@ -294,9 +299,22 @@ SrsDtlsSession::SrsDtlsSession(SrsRtcSession* s)
SrsDtlsSession::~SrsDtlsSession()
{
if (dtls) {
// this function will free bio_in and bio_out
SSL_free(dtls);
dtls = NULL;
}
if (srtp_send) {
srtp_dealloc(srtp_send);
}
if (srtp_recv) {
srtp_dealloc(srtp_recv);
}
}
srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
@ -308,7 +326,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket)
int ssl_err = SSL_get_error(dtls, ret);
switch(ssl_err) {
case SSL_ERROR_NONE: {
err = on_dtls_handshake_done(udp_remux_socket);
err = on_dtls_handshake_done(udp_mux_skt);
}
break;
@ -327,25 +345,25 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket)
if (out_bio_len) {
srs_trace("send dtls handshake data");
udp_remux_socket->sendto(out_bio_data, out_bio_len, 0);
udp_mux_skt->sendto(out_bio_data, out_bio_len, 0);
}
return err;
}
srs_error_t SrsDtlsSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
if (! handshake_done) {
BIO_reset(bio_in);
BIO_reset(bio_out);
BIO_write(bio_in, udp_remux_socket->data(), udp_remux_socket->size());
BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size());
handshake(udp_remux_socket);
handshake(udp_mux_skt);
} else {
BIO_reset(bio_in);
BIO_reset(bio_out);
BIO_write(bio_in, udp_remux_socket->data(), udp_remux_socket->size());
BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size());
while (BIO_ctrl_pending(bio_in) > 0) {
char dtls_read_buf[8092];
@ -360,7 +378,7 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket)
return err;
}
srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
srs_trace("dtls handshake done");
@ -371,7 +389,7 @@ srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_
return srs_error_wrap(err, "srtp init failed");
}
rtc_session->on_connection_established(udp_remux_socket);
rtc_session->on_connection_established(udp_mux_skt);
return err;
}
@ -383,7 +401,7 @@ srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int
return err;
}
void SrsDtlsSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket)
void SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt)
{
if (dtls == NULL) {
srs_trace("send client hello");
@ -396,7 +414,7 @@ void SrsDtlsSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket)
SSL_set_bio(dtls, bio_in, bio_out);
handshake(udp_remux_socket);
handshake(udp_mux_skt);
}
}
@ -405,42 +423,36 @@ srs_error_t SrsDtlsSession::srtp_initialize()
srs_error_t err = srs_success;
unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server
static string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
return srs_error_wrap(err, "SSL_export_keying_material failed");
}
size_t offset = 0;
std::string sClientMasterKey(reinterpret_cast<char*>(material), SRTP_MASTER_KEY_KEY_LEN);
std::string client_master_key(reinterpret_cast<char*>(material), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string sServerMasterKey(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_KEY_LEN);
std::string server_master_key(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_KEY_LEN);
offset += SRTP_MASTER_KEY_KEY_LEN;
std::string sClientMasterSalt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
std::string client_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
offset += SRTP_MASTER_KEY_SALT_LEN;
std::string sServerMasterSalt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
std::string server_master_salt(reinterpret_cast<char*>(material + offset), SRTP_MASTER_KEY_SALT_LEN);
client_key = sClientMasterKey + sClientMasterSalt;
server_key = sServerMasterKey + sServerMasterSalt;
client_key = client_master_key + client_master_salt;
server_key = server_master_key + server_master_salt;
srs_trace("client_key size=%d, server_key=%d", client_key.size(), server_key.size());
if (srtp_init() != 0) {
return srs_error_wrap(err, "srtp init failed");
if (srtp_send_init() != srs_success) {
return srs_error_wrap(err, "srtp send init failed");
}
if (srtp_sender_side_init() != srs_success) {
return srs_error_wrap(err, "srtp sender size init failed");
}
if (srtp_receiver_side_init() != srs_success) {
return srs_error_wrap(err, "srtp receiver size init failed");
if (srtp_recv_init() != srs_success) {
return srs_error_wrap(err, "srtp recv init failed");
}
return err;
}
srs_error_t SrsDtlsSession::srtp_sender_side_init()
srs_error_t SrsDtlsSession::srtp_send_init()
{
srs_error_t err = srs_success;
@ -463,16 +475,16 @@ srs_error_t SrsDtlsSession::srtp_sender_side_init()
policy.key = key;
if (srtp_create(&srtp_send, &policy) != 0) {
delete [] key;
srs_freepa(key);
return srs_error_wrap(err, "srtp_create failed");
}
delete [] key;
srs_freepa(key);
return err;
}
srs_error_t SrsDtlsSession::srtp_receiver_side_init()
srs_error_t SrsDtlsSession::srtp_recv_init()
{
srs_error_t err = srs_success;
@ -495,50 +507,80 @@ srs_error_t SrsDtlsSession::srtp_receiver_side_init()
policy.key = key;
if (srtp_create(&srtp_recv, &policy) != 0) {
delete [] key;
srs_freepa(key);
return srs_error_wrap(err, "srtp_create failed");
}
delete [] key;
srs_freepa(key);
return err;
}
srs_error_t SrsDtlsSession::srtp_sender_protect(char* protected_buf, const char* ori_buf, int& nb_protected_buf)
srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf)
{
srs_error_t err = srs_success;
if (srtp_send) {
memcpy(protected_buf, ori_buf, nb_protected_buf);
if (srtp_protect(srtp_send, protected_buf, &nb_protected_buf) != 0) {
srs_error("srtp sender protect failed");
return srs_error_wrap(err, "srtp sender protect failed");
memcpy(out_buf, in_buf, nb_out_buf);
if (srtp_protect(srtp_send, out_buf, &nb_out_buf) != 0) {
return srs_error_wrap(err, "rtp protect failed");
}
return err;
}
return srs_error_wrap(err, "srtp sender protect failed");
return srs_error_wrap(err, "rtp protect failed");
}
srs_error_t SrsDtlsSession::srtp_receiver_unprotect(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf)
srs_error_t SrsDtlsSession::unprotect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf)
{
srs_error_t err = srs_success;
if (srtp_send) {
memcpy(unprotected_buf, ori_buf, nb_unprotected_buf);
if (srtp_unprotect(srtp_recv, unprotected_buf, &nb_unprotected_buf) != 0) {
srs_error("srtp receiver unprotect failed");
return srs_error_wrap(err, "srtp receiver unprotect failed");
if (srtp_recv) {
memcpy(out_buf, in_buf, nb_out_buf);
if (srtp_unprotect(srtp_recv, out_buf, &nb_out_buf) != 0) {
return srs_error_wrap(err, "rtp unprotect failed");
}
return err;
}
return srs_error_wrap(err, "srtp receiver unprotect failed");
return srs_error_wrap(err, "rtp unprotect failed");
}
SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpRemuxSocket* u, int parent_cid)
srs_error_t SrsDtlsSession::protect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf)
{
srs_error_t err = srs_success;
if (srtp_send) {
memcpy(out_buf, in_buf, nb_out_buf);
if (srtp_protect_rtcp(srtp_send, out_buf, &nb_out_buf) != 0) {
return srs_error_wrap(err, "rtcp protect failed");
}
return err;
}
return srs_error_wrap(err, "rtcp protect failed");
}
srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf)
{
srs_error_t err = srs_success;
if (srtp_recv) {
memcpy(out_buf, in_buf, nb_out_buf);
if (srtp_unprotect_rtcp(srtp_recv, out_buf, &nb_out_buf) != 0) {
return srs_error_wrap(err, "rtcp unprotect failed");
}
return err;
}
return srs_error_wrap(err, "rtcp unprotect failed");
}
SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid)
: ukt(NULL)
{
_parent_cid = parent_cid;
@ -563,10 +605,10 @@ srs_error_t SrsRtcSenderThread::start()
srs_error_t err = srs_success;
srs_freep(trd);
trd = new SrsSTCoroutine("recv", this, _parent_cid);
trd = new SrsSTCoroutine("rtc_sender", this, _parent_cid);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "recv thread");
return srs_error_wrap(err, "rtc_sender");
}
return err;
@ -608,69 +650,91 @@ srs_error_t SrsRtcSenderThread::cycle()
SrsAutoFree(SrsConsumer, consumer);
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtc sender thread");
}
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
#ifdef SRS_PERF_QUEUE_COND_WAIT
consumer->wait(0, SRS_PERF_MW_SLEEP);
#endif
int msg_count = 0;
if (consumer->dump_packets(&msgs, msg_count) != srs_success) {
srs_trace("rtc pop no rtp packets");
continue;
}
srs_trace("rtc pop %d rtp packets", msg_count);
for (int i = 0; i < msg_count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
for (int i = 0; i < msg->nb_rtp_fragments; ++i) {
SrsBuffer stream(msg->rtp_fragments[i].bytes + 2, 2);
uint16_t seq = stream.read_2bytes();
srs_trace("rtp fragment size=%d, seq=%u, payload=%s", msg->rtp_fragments[i].size, seq,
dump_string_hex(msg->rtp_fragments[i].bytes, msg->rtp_fragments[i].size, 1460).c_str());
if (rtc_session->dtls_session) {
char rtp_send_protected_buf[1500];
int rtp_send_protected_len = msg->rtp_fragments[i].size;
rtc_session->dtls_session->srtp_sender_protect(rtp_send_protected_buf, msg->rtp_fragments[i].bytes, rtp_send_protected_len);
ukt.sendto(rtp_send_protected_buf, rtp_send_protected_len, 0);
}
}
srs_freep(msg);
if (msg_count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_usleep(mw_sleep);
#endif
// ignore when nothing got.
continue;
}
srs_usleep(16000);
send_and_free_messages(msgs.msgs, msg_count, &ukt);
}
}
void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt)
{
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr)
for (int i = 0; i < msg->nb_rtp_fragments; ++i) {
if (rtc_session->dtls_session) {
char protected_buf[kRtpPacketSize];
int nb_protected_buf = msg->rtp_fragments[i].size;
rtc_session->dtls_session->protect_rtp(protected_buf, msg->rtp_fragments[i].bytes, nb_protected_buf);
udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0);
}
}
srs_freep(msg);
}
}
SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string& un)
{
server = svr;
rtc_server = rtc_svr;
session_state = INIT;
dtls_session = NULL;
strd = NULL;
username = un;
last_stun_time = srs_get_system_time();
}
SrsRtcSession::~SrsRtcSession()
{
srs_freep(dtls_session);
if (strd) {
strd->stop();
}
srs_freep(strd);
}
srs_error_t SrsRtcSession::on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req)
srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req)
{
srs_error_t err = srs_success;
if (stun_req->is_binding_request()) {
if (on_binding_request(udp_remux_socket, stun_req) != srs_success) {
if (on_binding_request(udp_mux_skt, stun_req) != srs_success) {
return srs_error_wrap(err, "stun binding request failed");
}
}
last_stun_time = srs_get_system_time();
return err;
}
srs_error_t SrsRtcSession::on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req)
srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req)
{
srs_error_t err = srs_success;
@ -684,77 +748,74 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpRemuxSocket* udp_remux_socke
stun_binding_response.set_remote_ufrag(stun_req->get_local_ufrag());
stun_binding_response.set_transcation_id(stun_req->get_transcation_id());
// FIXME: inet_addr is deprecated, IPV6 support
stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_remux_socket->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(udp_remux_socket->get_peer_port());
stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port());
if (stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream) != srs_success) {
return srs_error_wrap(err, "stun binding response encode failed");
}
if (udp_remux_socket->sendto(stream->data(), stream->pos(), 0) <= 0) {
if (udp_mux_skt->sendto(stream->data(), stream->pos(), 0) <= 0) {
return srs_error_wrap(err, "stun binding response send failed");
}
if (get_session_state() == WAITING_STUN) {
set_session_state(DOING_DTLS_HANDSHAKE);
send_client_hello(udp_remux_socket);
send_client_hello(udp_mux_skt);
string peer_id = udp_remux_socket->get_peer_id();
peer_id = udp_mux_skt->get_peer_id();
rtc_server->insert_into_id_sessions(peer_id, this);
}
// TODO: dtls send client retry
return err;
}
srs_error_t SrsRtcSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt)
{
if (dtls_session == NULL) {
dtls_session = new SrsDtlsSession(this);
}
dtls_session->send_client_hello(udp_remux_socket);
dtls_session->send_client_hello(udp_mux_skt);
}
void SrsRtcSession::on_connection_established(SrsUdpRemuxSocket* udp_remux_socket)
void SrsRtcSession::on_connection_established(SrsUdpMuxSocket* udp_mux_skt)
{
start_play(udp_remux_socket);
srs_trace("rtc session=%s, connection established", id().c_str());
start_play(udp_mux_skt);
}
srs_error_t SrsRtcSession::start_play(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
strd = new SrsRtcSenderThread(this, udp_remux_socket, _srs_context->get_id());
srs_freep(strd);
strd = new SrsRtcSenderThread(this, udp_mux_skt, _srs_context->get_id());
strd->start();
return err;
}
srs_error_t SrsRtcSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt)
{
return dtls_session->on_dtls(udp_remux_socket);
return dtls_session->on_dtls(udp_mux_skt);
}
srs_error_t SrsRtcSession::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
if (dtls_session == NULL) {
return srs_error_wrap(err, "recv unexpect rtp/rtcp packet before dtls done");
return srs_error_wrap(err, "recv unexpect rtp packet before dtls done");
}
uint8_t payload_type = udp_remux_socket->data()[1] & 0x7F;
char srtp_unprotect_buf[1460];
int nb_srtp_unprotect_buf = udp_remux_socket->size();
if (dtls_session->srtp_receiver_unprotect(srtp_unprotect_buf, udp_remux_socket->data(), nb_srtp_unprotect_buf) != srs_success) {
return srs_error_wrap(err, "srtp receiver unprotect failed, payload_type=%u", payload_type);
char unprotected_buf[1460];
int nb_unprotected_buf = udp_mux_skt->size();
if (dtls_session->unprotect_rtp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) {
return srs_error_wrap(err, "rtp unprotect failed");
}
//srs_trace("srtp unprotect success, %s", dump_string_hex(srtp_unprotect_buf, nb_srtp_unprotect_buf, nb_srtp_unprotect_buf).c_str());
SrsBuffer* stream = new SrsBuffer(srtp_unprotect_buf, nb_srtp_unprotect_buf);
// FIXME: use SrsRtpPacket
SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf);
SrsAutoFree(SrsBuffer, stream);
uint8_t first = stream->read_1bytes();
uint8_t second = stream->read_1bytes();
@ -769,7 +830,7 @@ srs_error_t SrsRtcSession::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket)
uint32_t timestamp = stream->read_4bytes();
uint32_t ssrc = stream->read_4bytes();
srs_trace("sequence=%u, timestamp=%u, ssrc=%u, padding=%d, ext=%d, cc=%u, marker=%d, payload_type=%u",
srs_verbose("sequence=%u, timestamp=%u, ssrc=%u, padding=%d, ext=%d, cc=%u, marker=%d, payload_type=%u",
sequence, timestamp, ssrc, padding, ext, cc, marker, payload_type);
for (uint8_t i = 0; i < cc; ++i) {
@ -781,70 +842,40 @@ srs_error_t SrsRtcSession::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket)
uint16_t extern_profile = stream->read_2bytes();
uint16_t extern_length = stream->read_2bytes();
srs_trace("extern_profile=%u, extern_length=%u", extern_profile, extern_length);
srs_verbose("extern_profile=%u, extern_length=%u", extern_profile, extern_length);
stream->read_string(extern_length * 4);
}
if (payload_type == 102) {
static uint32_t pre_seq = 0;
uint32_t seq = sequence;
srs_assert(pre_seq == 0 || (pre_seq + 1 == seq));
return err;
}
pre_seq = seq;
static uint8_t start_code[4] = {0x00, 0x00, 0x00, 0x01};
static int fd = -1;
if (fd < 0) {
fd = open("rtc.264", O_CREAT|O_TRUNC|O_RDWR, 0664);
}
const uint8_t* p = (const uint8_t*)stream->data() + stream->pos();
int len = stream->left();
uint8_t header = p[0];
uint8_t nal_type = header & kNalTypeMask;
srs_trace("nal_type=%u, seq=%u, rtp payload, %s", nal_type, sequence, dump_string_hex(stream->data() + stream->pos(), stream->left(), stream->left()).c_str());
if (nal_type >=1 && nal_type <= 23) {
srs_trace("single nalu");
write(fd, start_code, sizeof(start_code));
write(fd, p, len);
} else if (nal_type == kFuA) {
srs_trace("FuA");
if (p[1] & 0x80) {
uint8_t nal_type = ((p[0] & (~kNalTypeMask)) | (p[1] & kNalTypeMask));
write(fd, start_code, sizeof(start_code));
write(fd, &nal_type, 1);
write(fd, p + 2, len - 2);
} else {
write(fd, p + 2, len - 2);
}
} else if (nal_type == kStapA) {
srs_trace("StapA");
int pos = 1;
while (pos < len) {
int nal_len = p[pos] << 8 | p[pos + 1];
srs_trace("nal_len=%d", nal_len);
write(fd, start_code, sizeof(start_code));
write(fd, p + pos + 2, nal_len);
pos += nal_len + 2;
}
srs_assert(pos == len);
} else {
srs_assert(false);
}
srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
if (dtls_session == NULL) {
return srs_error_wrap(err, "recv unexpect rtcp packet before dtls done");
}
// XXX:send h264 back to client, for debug
if (payload_type == 102) {
char rtp_send_protected_buf[1500];
int rtp_send_protected_len = nb_srtp_unprotect_buf;
SrsBuffer stream(srtp_unprotect_buf + 8, 4);
stream.write_4bytes(3233846889);
dtls_session->srtp_sender_protect(rtp_send_protected_buf, srtp_unprotect_buf, rtp_send_protected_len);
udp_remux_socket->sendto(rtp_send_protected_buf, rtp_send_protected_len, 0);
char unprotected_buf[1460];
int nb_unprotected_buf = udp_mux_skt->size();
if (dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect failed");
}
// FIXME: use SrsRtpPacket
SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf);
SrsAutoFree(SrsBuffer, stream);
uint8_t first = stream->read_1bytes();
uint8_t payload_type = stream->read_1bytes();
if (payload_type == kSR) {
} else if (payload_type == kRR) {
} else if (kSDES) {
} else if (kBye) {
} else if (kApp) {
} else {
return srs_error_wrap(err, "unknown rtcp type=%u", payload_type);
}
return err;
@ -857,25 +888,32 @@ SrsRtcServer::SrsRtcServer(SrsServer* svr)
SrsRtcServer::~SrsRtcServer()
{
rttrd->stop();
srs_freep(rttrd);
}
srs_error_t SrsRtcServer::initialize()
{
srs_error_t err = srs_success;
rttrd = new SrsRtcTimerThread(this, _srs_context->get_id());
if (rttrd->start() != srs_success) {
return srs_error_wrap(err, "rtc timer thread init failed");
}
return err;
}
srs_error_t SrsRtcServer::on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
if (is_stun(udp_remux_socket->data(), udp_remux_socket->size())) {
return on_stun(udp_remux_socket);
} else if (is_dtls(udp_remux_socket->data(), udp_remux_socket->size())) {
return on_dtls(udp_remux_socket);
} else if (is_rtp_or_rtcp(udp_remux_socket->data(), udp_remux_socket->size())) {
return on_rtp_or_rtcp(udp_remux_socket);
if (is_stun(udp_mux_skt->data(), udp_mux_skt->size())) {
return on_stun(udp_mux_skt);
} else if (is_dtls(udp_mux_skt->data(), udp_mux_skt->size())) {
return on_dtls(udp_mux_skt);
} else if (is_rtp_or_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) {
return on_rtp_or_rtcp(udp_mux_skt);
}
return srs_error_wrap(err, "unknown udp packet type");
@ -883,20 +921,20 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket)
SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{
SrsRtcSession* session = new SrsRtcSession(server, this);
std::string local_pwd = gen_random_str(32);
std::string local_ufrag = "";
std::string username = "";
while (true) {
local_ufrag = gen_random_str(8);
std::string username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
bool ret = map_username_session.insert(make_pair(username, session)).second;
if (ret) {
username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
if (! map_username_session.count(username))
break;
}
}
SrsRtcSession* session = new SrsRtcSession(server, this, username);
map_username_session.insert(make_pair(username, session));
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
@ -918,14 +956,14 @@ SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id)
return iter->second;
}
srs_error_t SrsRtcServer::on_stun(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
srs_trace("recv stun packet from %s", udp_remux_socket->get_peer_id().c_str());
srs_trace("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str());
SrsStunPacket stun_req;
if (stun_req.decode(udp_remux_socket->data(), udp_remux_socket->size()) != srs_success) {
if (stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size()) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}
@ -935,37 +973,40 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpRemuxSocket* udp_remux_socket)
return srs_error_wrap(err, "can not find rtc_session, stun username=%s", username.c_str());
}
return rtc_session->on_stun(udp_remux_socket, &stun_req);
return rtc_session->on_stun(udp_mux_skt, &stun_req);
}
srs_error_t SrsRtcServer::on_dtls(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
srs_trace("on dtls");
SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_remux_socket->get_peer_id());
SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id());
if (rtc_session == NULL) {
return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_remux_socket->get_peer_id().c_str());
return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str());
}
rtc_session->on_dtls(udp_remux_socket);
rtc_session->on_dtls(udp_mux_skt);
return err;
}
srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket)
srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
srs_trace("on rtp/rtcp");
SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_remux_socket->get_peer_id());
SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id());
if (rtc_session == NULL) {
return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_remux_socket->get_peer_id().c_str());
return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str());
}
rtc_session->on_rtp_or_rtcp(udp_remux_socket);
if (is_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) {
rtc_session->on_rtcp(udp_mux_skt);
} else {
rtc_session->on_rtp(udp_mux_skt);
}
return err;
}
@ -984,3 +1025,82 @@ bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession*
{
return map_id_session.insert(make_pair(peer_id, rtc_session)).second;
}
void SrsRtcServer::check_and_clean_timeout_session()
{
map<string, SrsRtcSession*>::iterator iter = map_username_session.begin();
while (iter != map_username_session.end()) {
SrsRtcSession* session = iter->second;
if (session == NULL) {
map_username_session.erase(iter++);
continue;
}
if (session->is_stun_timeout()) {
srs_trace("rtc session=%s, stun timeout", session->id().c_str());
map_username_session.erase(iter++);
map_id_session.erase(session->get_peer_id());
delete session;
continue;
}
++iter;
}
}
SrsRtcTimerThread::SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid)
{
_parent_cid = parent_cid;
trd = new SrsDummyCoroutine();
rtc_server = rtc_svr;
}
SrsRtcTimerThread::~SrsRtcTimerThread()
{
srs_freep(trd);
}
int SrsRtcTimerThread::cid()
{
return trd->cid();
}
srs_error_t SrsRtcTimerThread::start()
{
srs_error_t err = srs_success;
srs_freep(trd);
trd = new SrsSTCoroutine("rtc_timer", this, _parent_cid);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "rtc_timer");
}
return err;
}
void SrsRtcTimerThread::stop()
{
trd->stop();
}
void SrsRtcTimerThread::stop_loop()
{
trd->interrupt();
}
srs_error_t SrsRtcTimerThread::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
srs_trace("rtc_timer cycle failed");
return srs_error_wrap(err, "rtc timer thread");
}
srs_usleep(1*1000*1000LL);
rtc_server->check_and_clean_timeout_session();
}
}

View file

@ -27,6 +27,7 @@
#include <srs_core.hpp>
#include <srs_app_listener.hpp>
#include <srs_service_st.hpp>
#include <srs_kernel_utility.hpp>
#include <string>
#include <map>
@ -35,12 +36,21 @@
#include <openssl/ssl.h>
#include <srtp2/srtp.h>
class SrsUdpRemuxSocket;
class SrsUdpMuxSocket;
class SrsServer;
class SrsConsumer;
class SrsStunPacket;
class SrsRtcServer;
class SrsRtcSession;
class SrsSharedPtrMessage;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
const uint8_t kSDES = 202;
const uint8_t kBye = 203;
const uint8_t kApp = 204;
const srs_utime_t kSrsRtcSessionStunTimeoutUs = 10*1000*1000LL;
class SrsCandidate
{
@ -116,19 +126,22 @@ public:
SrsDtlsSession(SrsRtcSession* s);
virtual ~SrsDtlsSession();
srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_dtls_handshake_done(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls_application_data(const char* data, const int len);
void send_client_hello(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t handshake(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t srtp_sender_protect(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t srtp_receiver_unprotect(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
void send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
public:
srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
private:
srs_error_t handshake(SrsUdpMuxSocket* udp_mux_skt);
private:
srs_error_t srtp_initialize();
srs_error_t srtp_sender_side_init();
srs_error_t srtp_receiver_side_init();
srs_error_t srtp_send_init();
srs_error_t srtp_recv_init();
};
class SrsRtcSenderThread : public ISrsCoroutineHandler
@ -138,11 +151,11 @@ protected:
int _parent_cid;
private:
SrsRtcSession* rtc_session;
SrsUdpRemuxSocket ukt;
SrsUdpMuxSocket ukt;
public:
// Constructor.
// @param tm The receive timeout in srs_utime_t.
SrsRtcSenderThread(SrsRtcSession* s, SrsUdpRemuxSocket* u, int parent_cid);
SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid);
virtual ~SrsRtcSenderThread();
public:
virtual int cid();
@ -152,6 +165,8 @@ public:
virtual void stop_loop();
public:
virtual srs_error_t cycle();
private:
void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt);
};
class SrsRtcSession
@ -165,39 +180,76 @@ private:
SrsRtcSessionStateType session_state;
SrsDtlsSession* dtls_session;
SrsRtcSenderThread* strd;
std::string username;
std::string peer_id;
srs_utime_t last_stun_time;
public:
std::string app;
std::string stream;
public:
SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr);
SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const std::string& un);
virtual ~SrsRtcSession();
public:
SrsSdp* get_local_sdp() { return &local_sdp; }
SrsSdp* get_remote_sdp() { return &remote_sdp; }
SrsRtcSessionStateType get_session_state() { return session_state; }
void set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; }
SrsSdp* get_remote_sdp() { return &remote_sdp; }
void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; }
SrsRtcSessionStateType get_session_state() { return session_state; }
void set_session_state(SrsRtcSessionStateType state) { session_state = state; }
std::string id() const { return peer_id + "_" + username; }
void set_app_stream(const std::string& a, const std::string& s) { app = a; stream = s; }
std::string get_peer_id() const { return peer_id; }
void set_peer_id(const std::string& id) { peer_id = id; }
public:
srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req);
srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtp(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt);
public:
srs_error_t send_client_hello(SrsUdpRemuxSocket* udp_remux_socket);
void on_connection_established(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t start_play(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
void on_connection_established(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt);
public:
bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); }
private:
srs_error_t on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req);
srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req);
private:
srs_error_t do_playing(SrsConsumer* consumer, SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t do_playing(SrsConsumer* consumer, SrsUdpMuxSocket* udp_mux_skt);
};
class SrsRtcServer : public ISrsUdpRemuxHandler
// XXX: is there any other timer thread?
class SrsRtcTimerThread : public ISrsCoroutineHandler
{
protected:
SrsCoroutine* trd;
int _parent_cid;
private:
SrsRtcServer* rtc_server;
public:
// Constructor.
// @param tm The receive timeout in srs_utime_t.
SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid);
virtual ~SrsRtcTimerThread();
public:
virtual int cid();
public:
virtual srs_error_t start();
virtual void stop();
virtual void stop_loop();
public:
virtual srs_error_t cycle();
};
class SrsRtcServer : public ISrsUdpMuxHandler
{
private:
SrsServer* server;
SrsRtcTimerThread* rttrd;
private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
@ -207,14 +259,15 @@ public:
public:
virtual srs_error_t initialize();
virtual srs_error_t on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket);
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt);
SrsRtcSession* create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp);
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session);
void check_and_clean_timeout_session();
private:
srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket);
srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt);
private:
SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag);
SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);

View file

@ -87,10 +87,12 @@ srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsF
packet_fu_a(shared_frame, format, &sample, rtp_packet_vec);
}
#if 0
srs_trace("nal size=%d, nal=%s", sample.size, dump_string_hex(sample.bytes, sample.size, sample.size).c_str());
for (int i = 0; i < shared_frame->nb_rtp_fragments; ++i) {
srs_trace("rtp=%s", dump_string_hex(shared_frame->rtp_fragments[i].bytes, shared_frame->rtp_fragments[i].size, kRtpPacketSize).c_str());
}
#endif
}
SrsSample* rtp_samples = new SrsSample[rtp_packet_vec.size()];
@ -134,7 +136,6 @@ srs_error_t SrsRtpMuxer::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsForma
stream->write_1bytes(kH264PayloadType);
}
// sequence
srs_trace("sequence=%u", sequence);
stream->write_2bytes(sequence++);
// timestamp
stream->write_4bytes(int32_t(shared_frame->timestamp * 90));
@ -186,7 +187,6 @@ srs_error_t SrsRtpMuxer::packet_single_nalu(SrsSharedPtrMessage* shared_frame, S
// marker payloadtype
stream->write_1bytes(kMarker | kH264PayloadType);
// sequenct
srs_trace("sequence=%u", sequence);
stream->write_2bytes(sequence++);
// timestamp
stream->write_4bytes(int32_t(shared_frame->timestamp * 90));
@ -220,7 +220,6 @@ srs_error_t SrsRtpMuxer::packet_stap_a(const string &sps, const string& pps, Srs
// marker payloadtype
stream->write_1bytes(kMarker | kH264PayloadType);
// sequenct
srs_trace("sequence=%u", sequence);
stream->write_2bytes(sequence++);
// timestamp
stream->write_4bytes(int32_t(shared_frame->timestamp * 90));

View file

@ -360,7 +360,7 @@ srs_error_t SrsRtcListener::listen(std::string i, int p)
port = p;
srs_freep(listener);
listener = new SrsUdpRemuxListener(rtc, ip, port);
listener = new SrsUdpMuxListener(rtc, ip, port);
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
@ -649,6 +649,10 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch)
if ((err = http_server->initialize()) != srs_success) {
return srs_error_wrap(err, "http server initialize");
}
if ((err = rtc_server->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc server initialize");
}
return err;
}

View file

@ -162,8 +162,8 @@ public:
class SrsRtcListener : public SrsListener
{
protected:
SrsUdpRemuxListener* listener;
ISrsUdpRemuxHandler* rtc;
SrsUdpMuxListener* listener;
ISrsUdpMuxHandler* rtc;
public:
SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t);
virtual ~SrsRtcListener();

View file

@ -402,6 +402,11 @@ int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to,
return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout);
}
int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout)
{
return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{
return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);

View file

@ -89,6 +89,7 @@ extern srs_netfd_t srs_netfd_open(int osfd);
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout);
extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout);
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);