1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 20:01:56 +00:00

Refactor code to split implementation to cpp

This commit is contained in:
winlin 2020-05-02 09:48:04 +08:00
parent 6f2b78f16a
commit b3a0284db5
14 changed files with 363 additions and 110 deletions

View file

@ -46,14 +46,6 @@ SrsDtls::~SrsDtls()
SSL_CTX_free(dtls_ctx);
}
SrsDtls* SrsDtls::instance()
{
if (!_instance) {
_instance = new SrsDtls();
}
return _instance;
}
// The return value of verify_callback controls the strategy of the further verification process. If verify_callback
// returns 0, the verification process is immediately stopped with "verification failed" state. If SSL_VERIFY_PEER is
// set, a verification failure alert is sent to the peer and the TLS/SSL handshake is terminated. If verify_callback
@ -256,3 +248,22 @@ srs_error_t SrsDtls::init(SrsRequest* r)
return err;
}
SrsDtls* SrsDtls::instance()
{
if (!_instance) {
_instance = new SrsDtls();
}
return _instance;
}
SSL_CTX* SrsDtls::get_dtls_ctx()
{
return dtls_ctx;
}
std::string SrsDtls::get_fingerprint() const
{
return fingerprint;
}

View file

@ -46,9 +46,9 @@ public:
srs_error_t init(SrsRequest* r);
public:
static SrsDtls* instance();
SSL_CTX* get_dtls_ctx() { return dtls_ctx; }
SSL_CTX* get_dtls_ctx();
public:
std::string get_fingerprint() const { return fingerprint; }
std::string get_fingerprint() const;
};
#endif

View file

@ -261,23 +261,6 @@ SrsUdpMuxSocket::~SrsUdpMuxSocket()
srs_freepa(buf);
}
SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
{
SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd);
// Don't copy buffer
srs_freepa(sendonly->buf);
sendonly->nb_buf = 0;
sendonly->nread = 0;
sendonly->lfd = lfd;
sendonly->from = from;
sendonly->fromlen = fromlen;
sendonly->peer_ip = peer_ip;
sendonly->peer_port = peer_port;
return sendonly;
}
int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
{
fromlen = sizeof(from);
@ -341,6 +324,26 @@ socklen_t SrsUdpMuxSocket::peer_addrlen()
return (socklen_t)fromlen;
}
char* SrsUdpMuxSocket::data()
{
return buf;
}
int SrsUdpMuxSocket::size()
{
return nread;
}
std::string SrsUdpMuxSocket::get_peer_ip() const
{
return peer_ip;
}
int SrsUdpMuxSocket::get_peer_port() const
{
return peer_port;
}
std::string SrsUdpMuxSocket::get_peer_id()
{
char id_buf[1024];
@ -349,6 +352,28 @@ std::string SrsUdpMuxSocket::get_peer_id()
return string(id_buf, len);
}
SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
{
SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd);
// Don't copy buffer
srs_freepa(sendonly->buf);
sendonly->nb_buf = 0;
sendonly->nread = 0;
sendonly->lfd = lfd;
sendonly->from = from;
sendonly->fromlen = fromlen;
sendonly->peer_ip = peer_ip;
sendonly->peer_port = peer_port;
return sendonly;
}
ISrsUdpSender* SrsUdpMuxSocket::sender()
{
return handler;
}
SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p)
{
handler = h;

View file

@ -163,26 +163,19 @@ private:
public:
SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd);
virtual ~SrsUdpMuxSocket();
public:
int recvfrom(srs_utime_t timeout);
srs_error_t sendto(void* data, int size, srs_utime_t timeout);
srs_netfd_t stfd();
sockaddr_in* peer_addr();
socklen_t peer_addrlen();
char* data() { return buf; }
int size() { return nread; }
std::string get_peer_ip() const { return peer_ip; }
int get_peer_port() const { return peer_port; }
char* data();
int size();
std::string get_peer_ip() const;
int get_peer_port() const;
std::string get_peer_id();
public:
SrsUdpMuxSocket* copy_sendonly();
ISrsUdpSender* sender() { return handler; };
private:
// Don't allow copy, user copy_sendonly instead
SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs);
SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs);
ISrsUdpSender* sender();
};
class SrsUdpMuxListener : public ISrsCoroutineHandler

View file

@ -3396,26 +3396,6 @@ srs_error_t SrsRtcServer::create_rtc_session(
return err;
}
SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id)
{
map<string, SrsRtcSession*>::iterator iter = map_id_session.find(peer_id);
if (iter == map_id_session.end()) {
return NULL;
}
return iter->second;
}
SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username)
{
map<string, SrsRtcSession*>::iterator iter = map_username_session.find(username);
if (iter == map_username_session.end()) {
return NULL;
}
return iter->second;
}
bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* rtc_session)
{
return map_id_session.insert(make_pair(peer_id, rtc_session)).second;
@ -3447,6 +3427,31 @@ void SrsRtcServer::check_and_clean_timeout_session()
}
}
int SrsRtcServer::nn_sessions()
{
return (int)map_username_session.size();
}
SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id)
{
map<string, SrsRtcSession*>::iterator iter = map_id_session.find(peer_id);
if (iter == map_id_session.end()) {
return NULL;
}
return iter->second;
}
SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username)
{
map<string, SrsRtcSession*>::iterator iter = map_username_session.find(username);
if (iter == map_username_session.end()) {
return NULL;
}
return iter->second;
}
srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
check_and_clean_timeout_session();

View file

@ -444,7 +444,7 @@ public:
);
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session);
void check_and_clean_timeout_session();
int nn_sessions() { return (int)map_username_session.size(); }
int nn_sessions();
private:
SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag);
SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);

View file

@ -150,6 +150,21 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer()
srs_freepa(queue_);
}
uint16_t SrsRtpRingBuffer::low()
{
return low_;
}
uint16_t SrsRtpRingBuffer::high()
{
return high_;
}
void SrsRtpRingBuffer::advance_to(uint16_t seq)
{
low_ = seq;
}
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
{
SrsRtpPacket2* p = queue_[at % capacity_];
@ -161,6 +176,11 @@ void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
queue_[at % capacity_] = pkt;
}
void SrsRtpRingBuffer::remove(uint16_t at)
{
set(at, NULL);
}
void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high)
{
for (uint16_t s = low; s != high; ++s) {
@ -168,6 +188,16 @@ void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high)
}
}
bool SrsRtpRingBuffer::overflow()
{
return high_ - low_ < capacity_;
}
bool SrsRtpRingBuffer::is_heavy()
{
return high_ - low_ >= capacity_ / 2;
}
uint16_t SrsRtpRingBuffer::next_start_of_frame()
{
if (low_ == high_) {
@ -245,6 +275,11 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui
}
}
SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq)
{
return queue_[seq % capacity_];
}
SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
{
nn_collected_frames = 0;
@ -396,6 +431,11 @@ void SrsRtpQueue::notify_nack_list_full()
queue_->advance_to(next + 1);
}
void SrsRtpQueue::request_keyframe()
{
request_key_frame_ = true;
}
uint32_t SrsRtpQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
@ -450,7 +490,9 @@ void SrsRtpQueue::collect_packet()
{
while (queue_->low() != queue_->high()) {
vector<SrsRtpPacket2*> frame;
for (uint16_t s = queue_->low(); s != queue_->high(); ++s) {
uint16_t s = queue_->low();
for (; s != queue_->high(); ++s) {
SrsRtpPacket2* pkt = queue_->at(s);
// In NACK, never collect frame.
@ -468,14 +510,17 @@ void SrsRtpQueue::collect_packet()
frame.push_back(pkt);
// Not the last packet, continue to process next one.
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
if (!pkt->rtp_header.get_marker() && !one_packet_per_frame_) {
continue;
}
// Done, we got the last packet of frame.
nn_collected_frames++;
frames_.push_back(frame);
break;
}
if (queue_->low() != s) {
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), s);

View file

@ -133,17 +133,17 @@ public:
virtual ~SrsRtpRingBuffer();
public:
// Move the position of buffer.
uint16_t low() { return low_; }
uint16_t high() { return high_; }
void advance_to(uint16_t seq) { low_ = seq; }
uint16_t low();
uint16_t high();
void advance_to(uint16_t seq);
// Free the packet at position.
void set(uint16_t at, SrsRtpPacket2* pkt);
void remove(uint16_t at) { set(at, NULL); }
void remove(uint16_t at);
// Directly reset range [low, high] to NULL.
void reset(uint16_t low, uint16_t high);
// Whether queue overflow or heavy(too many packets and need clear).
bool overflow() { return high_ - low_ < capacity_; }
bool is_heavy() { return high_ - low_ >= capacity_ / 2; }
bool overflow();
bool is_heavy();
// Get the next start packet of frame.
// @remark If not found, return the low_, which should never be the "next" one,
// because it MAY or NOT current start packet of frame but never be the next.
@ -156,7 +156,7 @@ public:
// Update the sequence, got the nack range by [low, high].
void update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high);
// Get the packet by seq.
SrsRtpPacket2* at(uint16_t seq) { return queue_[seq % capacity_]; }
SrsRtpPacket2* at(uint16_t seq);
};
class SrsRtpQueue
@ -186,7 +186,7 @@ public:
bool should_request_key_frame();
void notify_drop_seq(uint16_t seq);
void notify_nack_list_full();
void request_keyframe() { request_key_frame_ = true; }
void request_keyframe();
public:
uint32_t get_extended_highest_sequence();
uint8_t get_fraction_lost();

View file

@ -457,6 +457,11 @@ SrsConsumer::~SrsConsumer()
#endif
}
void SrsConsumer::enable_pass_timestamp()
{
pass_timestamp = true;
}
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
{
queue->set_queue_size(queue_size);
@ -2764,6 +2769,11 @@ void SrsSource::request_keyframe()
}
}
void SrsSource::set_rtc_publisher(SrsRtcPublisher* v)
{
rtc_publisher = v;
}
srs_error_t SrsSource::on_rtc_audio(SrsSharedPtrMessage* audio)
{
// TODO: FIXME: Merge with on_audio.

View file

@ -217,7 +217,7 @@ public:
virtual ~SrsConsumer();
public:
// Use pass timestamp mode.
void enable_pass_timestamp() { pass_timestamp = true; }
void enable_pass_timestamp();
// Set the size of queue.
virtual void set_queue_size(srs_utime_t queue_size);
// when source id changed, notice client to print.
@ -658,7 +658,7 @@ public:
// TODO: FIXME: Maybe we could cache the keyframe.
// TODO: FIXME: Maybe we should only response for the new clients.
void request_keyframe();
void set_rtc_publisher(SrsRtcPublisher* v) { rtc_publisher = v; }
void set_rtc_publisher(SrsRtcPublisher* v);
// When got RTC audio message, which is encoded in opus.
// TODO: FIXME: Merge with on_audio.
srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio);

View file

@ -198,6 +198,71 @@ int SrsRtpHeader::nb_bytes()
return kRtpHeaderFixedSize + cc * 4 + (extension ? (extension_length + 1) * 4 : 0);
}
void SrsRtpHeader::set_marker(bool v)
{
marker = v;
}
bool SrsRtpHeader::get_marker() const
{
return marker;
}
void SrsRtpHeader::set_payload_type(uint8_t v)
{
payload_type = v;
}
uint8_t SrsRtpHeader::get_payload_type() const
{
return payload_type;
}
void SrsRtpHeader::set_sequence(uint16_t v)
{
sequence = v;
}
uint16_t SrsRtpHeader::get_sequence() const
{
return sequence;
}
void SrsRtpHeader::set_timestamp(int64_t v)
{
timestamp = (uint32_t)v;
}
int64_t SrsRtpHeader::get_timestamp() const
{
return timestamp;
}
void SrsRtpHeader::set_ssrc(uint32_t v)
{
ssrc = v;
}
uint32_t SrsRtpHeader::get_ssrc() const
{
return ssrc;
}
void SrsRtpHeader::set_padding(bool v)
{
padding = v;
}
void SrsRtpHeader::set_padding_length(uint8_t v)
{
padding_length = v;
}
uint8_t SrsRtpHeader::get_padding_length() const
{
return padding_length;
}
ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler()
{
}
@ -285,6 +350,17 @@ SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua()
return cache_fua;
}
void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h)
{
decode_handler = h;
}
void SrsRtpPacket2::set_original_bytes(char* buf, int nn_buf)
{
original_bytes = buf;
nn_original_bytes = nn_buf;
}
int SrsRtpPacket2::nb_bytes()
{
if (!cache_payload) {

View file

@ -78,19 +78,19 @@ public:
virtual srs_error_t encode(SrsBuffer* buf);
virtual int nb_bytes();
public:
inline void set_marker(bool v) { marker = v; }
bool get_marker() const { return marker; }
inline void set_payload_type(uint8_t v) { payload_type = v; }
uint8_t get_payload_type() const { return payload_type; }
inline void set_sequence(uint16_t v) { sequence = v; }
uint16_t get_sequence() const { return sequence; }
inline void set_timestamp(int64_t v) { timestamp = (uint32_t)v; }
int64_t get_timestamp() const { return timestamp; }
inline void set_ssrc(uint32_t v) { ssrc = v; }
uint32_t get_ssrc() const { return ssrc; }
inline void set_padding(bool v) { padding = v; }
inline void set_padding_length(uint8_t v) { padding_length = v; }
uint8_t get_padding_length() const { return padding_length; }
void set_marker(bool v);
bool get_marker() const;
void set_payload_type(uint8_t v);
uint8_t get_payload_type() const;
void set_sequence(uint16_t v);
uint16_t get_sequence() const;
void set_timestamp(int64_t v);
int64_t get_timestamp() const;
void set_ssrc(uint32_t v);
uint32_t get_ssrc() const;
void set_padding(bool v);
void set_padding_length(uint8_t v);
uint8_t get_padding_length() const;
};
class ISrsRtpPacketDecodeHandler
@ -150,9 +150,9 @@ public:
// Reuse the cached fua message as payload.
SrsRtpFUAPayload2* reuse_fua();
// Set the decode handler.
void set_decode_handler(ISrsRtpPacketDecodeHandler* h) { decode_handler = h; }
void set_decode_handler(ISrsRtpPacketDecodeHandler* h);
// Set the original bytes.
void set_original_bytes(char* buf, int nn_buf) { original_bytes = buf; nn_original_bytes = nn_buf; }
void set_original_bytes(char* buf, int nn_buf);
// interface ISrsEncoder
public:
virtual int nb_bytes();
@ -304,6 +304,7 @@ public:
SrsRtpH264Header& operator=(const SrsRtpH264Header& rhs);
};
// TODO: FIXME: Merge it with shared message.
class SrsRtpSharedPacket
{
private:

View file

@ -98,6 +98,96 @@ SrsStunPacket::~SrsStunPacket()
{
}
bool SrsStunPacket::is_binding_request() const
{
return message_type == BindingRequest;
}
bool SrsStunPacket::is_binding_response() const
{
return message_type == BindingResponse;
}
uint16_t SrsStunPacket::get_message_type() const
{
return message_type;
}
std::string SrsStunPacket::get_username() const
{
return username;
}
std::string SrsStunPacket::get_local_ufrag() const
{
return local_ufrag;
}
std::string SrsStunPacket::get_remote_ufrag() const
{
return remote_ufrag;
}
std::string SrsStunPacket::get_transcation_id() const
{
return transcation_id;
}
uint32_t SrsStunPacket::get_mapped_address() const
{
return mapped_address;
}
uint16_t SrsStunPacket::get_mapped_port() const
{
return mapped_port;
}
bool SrsStunPacket::get_ice_controlled() const
{
return ice_controlled;
}
bool SrsStunPacket::get_ice_controlling() const
{
return ice_controlling;
}
bool SrsStunPacket::get_use_candidate() const
{
return use_candidate;
}
void SrsStunPacket::set_message_type(const uint16_t& m)
{
message_type = m;
}
void SrsStunPacket::set_local_ufrag(const std::string& u)
{
local_ufrag = u;
}
void SrsStunPacket::set_remote_ufrag(const std::string& u)
{
remote_ufrag = u;
}
void SrsStunPacket::set_transcation_id(const std::string& t)
{
transcation_id = t;
}
void SrsStunPacket::set_mapped_address(const uint32_t& addr)
{
mapped_address = addr;
}
void SrsStunPacket::set_mapped_port(const uint32_t& port)
{
mapped_port = port;
}
srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf)
{
srs_error_t err = srs_success;

View file

@ -92,28 +92,25 @@ private:
public:
SrsStunPacket();
virtual ~SrsStunPacket();
bool is_binding_request() const { return message_type == BindingRequest; }
bool is_binding_response() const { return message_type == BindingResponse; }
uint16_t get_message_type() const { return message_type; }
std::string get_username() const { return username; }
std::string get_local_ufrag() const { return local_ufrag; }
std::string get_remote_ufrag() const { return remote_ufrag; }
std::string get_transcation_id() const { return transcation_id; }
uint32_t get_mapped_address() const { return mapped_address; }
uint16_t get_mapped_port() const { return mapped_port; }
bool get_ice_controlled() const { return ice_controlled; }
bool get_ice_controlling() const { return ice_controlling; }
bool get_use_candidate() const { return use_candidate; }
void set_message_type(const uint16_t& m) { message_type = m; }
void set_local_ufrag(const std::string& u) { local_ufrag = u; }
void set_remote_ufrag(const std::string& u) { remote_ufrag = u; }
void set_transcation_id(const std::string& t) { transcation_id = t; }
void set_mapped_address(const uint32_t& addr) { mapped_address = addr; }
void set_mapped_port(const uint32_t& port) { mapped_port = port; }
public:
bool is_binding_request() const;
bool is_binding_response() const;
uint16_t get_message_type() const;
std::string get_username() const;
std::string get_local_ufrag() const;
std::string get_remote_ufrag() const;
std::string get_transcation_id() const;
uint32_t get_mapped_address() const;
uint16_t get_mapped_port() const;
bool get_ice_controlled() const;
bool get_ice_controlling() const;
bool get_use_candidate() const;
void set_message_type(const uint16_t& m);
void set_local_ufrag(const std::string& u);
void set_remote_ufrag(const std::string& u);
void set_transcation_id(const std::string& t);
void set_mapped_address(const uint32_t& addr);
void set_mapped_port(const uint32_t& port);
srs_error_t decode(const char* buf, const int nb_buf);
srs_error_t encode(const std::string& pwd, SrsBuffer* stream);
private: